diff --git a/python/tests/test_disk_graph.py b/python/tests/test_disk_graph.py index 8d4a032f0..4b004feb6 100644 --- a/python/tests/test_disk_graph.py +++ b/python/tests/test_disk_graph.py @@ -1,4 +1,4 @@ -from raphtory import PyDirection, DiskGraphStorage +from raphtory import DiskGraphStorage from raphtory import algorithms import pandas as pd import tempfile @@ -35,17 +35,13 @@ ).sort_values(["src", "dst", "time"]) -def create_graph(edges, dir): - return DiskGraphStorage.load_from_pandas(dir, edges, "src", "dst", "time") - - # in every test use with to create a temporary directory that will be deleted automatically # after the with block ends - def test_counts(): - dir = tempfile.TemporaryDirectory() - graph = create_graph(edges, dir.name).to_events() + graph_dir = tempfile.TemporaryDirectory() + graph = DiskGraphStorage.load_from_pandas(graph_dir.name, edges, "src", "dst", "time") + graph = graph.to_events() assert graph.count_nodes() == 5 assert graph.count_edges() == 20 @@ -140,6 +136,7 @@ def test_disk_graph(): ) assert len(list(actual.get_all_with_names())) == 1624 + def test_disk_graph_type_filter(): curr_dir = os.path.dirname(os.path.abspath(__file__)) rsc_dir = os.path.join(curr_dir, "..", "..", "pometry-storage-private", "resources") diff --git a/raphtory-graphql/Cargo.toml b/raphtory-graphql/Cargo.toml index ba20d8ee8..616c5288d 100644 --- a/raphtory-graphql/Cargo.toml +++ b/raphtory-graphql/Cargo.toml @@ -51,6 +51,8 @@ base64-compat = { workspace = true } time = { workspace = true } reqwest = { workspace = true } moka = { workspace = true } +kdam = { workspace = true} + # python binding optional dependencies pyo3 = { workspace = true, optional = true } diff --git a/raphtory-graphql/src/model/graph/graphs.rs b/raphtory-graphql/src/model/graph/graphs.rs index 3b7400b42..e980452b0 100644 --- a/raphtory-graphql/src/model/graph/graphs.rs +++ b/raphtory-graphql/src/model/graph/graphs.rs @@ -1,5 +1,4 @@ use crate::data::get_graph_name; -use async_graphql::parser::Error; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use std::path::PathBuf; diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 8a2b23efd..66f5dae49 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -41,6 +41,8 @@ quad-rand = { workspace = true } serde_json = { workspace = true } ouroboros = { workspace = true } either = { workspace = true } +kdam = { workspace = true} + # io optional dependencies csv = { workspace = true, optional = true } @@ -66,7 +68,6 @@ display-error-chain = { workspace = true, optional = true } polars-arrow = { workspace = true, optional = true } polars-parquet = { workspace = true, optional = true } polars-utils = { workspace = true, optional = true } -kdam = { workspace = true, optional = true } memmap2 = { workspace = true, optional = true } ahash = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } @@ -125,8 +126,7 @@ python = [ "polars-arrow?/compute", "raphtory-api/python", "dep:rpds", - "dep:kdam", - "kdam?/notebook" + "kdam/notebook" ] # storage diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index 8f41d0ac8..7fad87b50 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -1,6 +1,8 @@ use crate::core::{utils::time::error::ParseTimeError, Prop, PropType}; #[cfg(feature = "arrow")] use polars_arrow::legacy::error; +#[cfg(feature = "python")] +use pyo3::PyErr; use raphtory_api::core::{entities::GID, storage::arc_str::ArcStr}; use std::path::PathBuf; #[cfg(feature = "search")] @@ -185,6 +187,12 @@ pub enum GraphError { #[error("Immutable graph is .. immutable!")] AttemptToMutateImmutableGraph, + + #[cfg(feature = "python")] + #[error("Python error occurred: {0}")] + PythonError(#[from] PyErr), + #[error("An error with Tdqm occurred")] + TqdmError, } impl GraphError { diff --git a/raphtory/src/io/arrow/dataframe.rs b/raphtory/src/io/arrow/dataframe.rs index 2b8173f5e..2090d727c 100644 --- a/raphtory/src/io/arrow/dataframe.rs +++ b/raphtory/src/io/arrow/dataframe.rs @@ -10,18 +10,22 @@ use polars_arrow::{ use itertools::Itertools; -#[derive(Debug)] -pub(crate) struct DFView { - pub(crate) names: Vec, - pub(crate) arrays: Vec>>, +pub(crate) struct DFView { + pub names: Vec, + pub(crate) chunks: I, + pub num_rows: usize, } -impl DFView { - pub(crate) fn get_inner_size(&self) -> usize { - if self.arrays.is_empty() || self.arrays[0].is_empty() { - return 0; +impl DFView +where + I: Iterator>, +{ + pub(crate) fn new(names: Vec, chunks: I, num_rows: usize) -> Self { + Self { + names, + chunks, + num_rows, } - self.arrays[0][0].len() } pub fn check_cols_exist(&self, cols: &[&str]) -> Result<(), GraphError> { @@ -36,66 +40,58 @@ impl DFView { Ok(()) } + pub(crate) fn get_index(&self, name: &str) -> Result { + self.names + .iter() + .position(|n| n == name) + .ok_or_else(|| GraphError::ColumnDoesNotExist(name.to_string())) + } +} + +#[derive(Clone)] +pub(crate) struct DFChunk { + pub(crate) chunk: Vec>, +} + +impl DFChunk { pub(crate) fn iter_col( &self, - name: &str, + idx: usize, ) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; - - let _ = (&self.arrays[0])[idx] + let col_arr = (&self.chunk)[idx] .as_any() .downcast_ref::>()?; - - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.iter() - }); - - Some(iter) + Some(col_arr.iter()) } - pub fn utf8(&self, name: &str) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; + pub fn utf8(&self, idx: usize) -> Option> + '_> { // test that it's actually a utf8 array - let _ = (&self.arrays[0])[idx] - .as_any() - .downcast_ref::>()?; - - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.iter() - }); + let col_arr = (&self.chunk)[idx].as_any().downcast_ref::>()?; - Some(iter) + Some(col_arr.iter()) } - pub fn time_iter_col(&self, name: &str) -> Option> + '_> { - let idx = self.names.iter().position(|n| n == name)?; - - let _ = (&self.arrays[0])[idx] + pub fn time_iter_col(&self, idx: usize) -> Option> + '_> { + let col_arr = (&self.chunk)[idx] .as_any() .downcast_ref::>()?; - let iter = self.arrays.iter().flat_map(move |arr| { - let arr = &arr[idx]; - let arr = if let DataType::Timestamp(_, _) = arr.data_type() { - let array = cast::cast( - &*arr.clone(), - &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())), - CastOptions::default(), - ) - .unwrap(); - array - } else { - arr.clone() - }; - - let arr = arr.as_any().downcast_ref::>().unwrap(); - arr.clone().into_iter() - }); - - Some(iter) + let arr = if let DataType::Timestamp(_, _) = col_arr.data_type() { + let array = cast::cast( + col_arr, + &DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())), + CastOptions::default(), + ) + .unwrap(); + array + .as_any() + .downcast_ref::>() + .unwrap() + .clone() + } else { + col_arr.clone() + }; + + Some(arr.into_iter()) } } diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index b05806af2..9de175c55 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -4,352 +4,397 @@ use crate::{ mutation::{internal::*, AdditionOps}, view::StaticGraphViewOps, }, - io::arrow::{dataframe::DFView, prop_handler::*}, + io::arrow::{ + dataframe::{DFChunk, DFView}, + prop_handler::*, + }, prelude::*, }; -#[cfg(feature = "python")] -use kdam::tqdm; + +use kdam::{Bar, BarBuilder, BarExt}; use std::{collections::HashMap, iter}; -#[cfg(feature = "python")] -macro_rules! maybe_tqdm { - ($iter:expr, $size:expr, $desc:literal) => { - tqdm!( - $iter, - desc = "Loading nodes", - total = $size, - animation = kdam::Animation::FillUp, - unit_scale = true - ) - }; +fn build_progress_bar(des: String, num_rows: usize) -> Result { + BarBuilder::default() + .desc(des) + .animation(kdam::Animation::FillUp) + .total(num_rows) + .unit_scale(true) + .build() + .map_err(|_| GraphError::TqdmError) } - -#[cfg(not(feature = "python"))] -macro_rules! maybe_tqdm { - ($iter:expr, $size:expr, $desc:literal) => { - $iter - }; +fn extract_out_default_type(n_t: Option<&str>) -> Option<&str> { + if n_t == Some("_default") { + None + } else { + n_t + } } pub(crate) fn load_nodes_from_df< 'a, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - df: &'a DFView, - size: usize, + df_view: DFView>>, node_id: &str, time: &str, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, node_type: Option<&str>, node_type_in_df: bool, graph: &G, ) -> Result<(), GraphError> { - let (prop_iter, const_prop_iter) = get_prop_rows(df, properties, const_properties)?; - - let node_type: Box>> = match node_type { - Some(node_type) => { - if node_type_in_df { - let iter_res: Result>>, GraphError> = - if let Some(node_types) = df.utf8::(node_type) { - Ok(Box::new(node_types)) - } else if let Some(node_types) = df.utf8::(node_type) { - Ok(Box::new(node_types)) - } else { - Err(GraphError::LoadFailure( - "Unable to convert / find node_type column in dataframe.".to_string(), - )) - }; - iter_res? - } else { - Box::new(iter::repeat(Some(node_type))) - } - } - None => Box::new(iter::repeat(None)), - }; - - if let (Some(node_id), Some(time)) = (df.iter_col::(node_id), df.time_iter_col(time)) { - let iter = node_id - .map(|i| i.copied()) - .zip(time) - .zip(node_type) - .map(|((node_id, time), n_t)| (node_id, time, n_t)); - load_nodes_from_num_iter( - graph, - size, - iter, - prop_iter, - const_prop_iter, - shared_const_properties, - )?; - } else if let (Some(node_id), Some(time)) = - (df.iter_col::(node_id), df.time_iter_col(time)) - { - let iter = node_id.map(i64_opt_into_u64_opt).zip(time); - let iter = iter - .zip(node_type) - .map(|((node_id, time), n_t)| (node_id, time, n_t)); - - load_nodes_from_num_iter( - graph, - size, - iter, - prop_iter, - const_prop_iter, - shared_const_properties, - )?; - } else if let (Some(node_id), Some(time)) = (df.utf8::(node_id), df.time_iter_col(time)) { - let iter = node_id.into_iter().zip(time); - let iter = iter - .zip(node_type) - .map(|((node_id, time), n_t)| (node_id, time, n_t)); - - let iter = maybe_tqdm!( - iter.zip(prop_iter).zip(const_prop_iter), - size, - "Loading nodes" - ); - - for (((node_id, time, n_t), props), const_props) in iter { - if let (Some(node_id), Some(time), n_t) = (node_id, time, n_t) { - let actual_type = extract_out_default_type(n_t); - let v = graph.add_node(time, node_id, props, actual_type)?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props.iter())?; + let properties = properties.unwrap_or(&[]); + let const_properties = const_properties.unwrap_or(&[]); + + let properties_indices = properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + let const_properties_indices = const_properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + + let node_type_index = node_type + .filter(|_| node_type_in_df) + .map(|node_type| df_view.get_index(node_type)) + .transpose()?; + let node_id_index = df_view.get_index(node_id)?; + let time_index = df_view.get_index(time)?; + + let mut pb = build_progress_bar("Loading nodes".to_string(), df_view.num_rows)?; + + for chunk in df_view.chunks { + let df = chunk?; + let prop_iter = combine_properties(properties, &properties_indices, &df)?; + let const_prop_iter = combine_properties(const_properties, &const_properties_indices, &df)?; + + let node_type: Box>> = match node_type { + Some(node_type) => match node_type_index { + Some(index) => { + let iter_res: Result>>, GraphError> = + if let Some(node_types) = df.utf8::(index) { + Ok(Box::new(node_types)) + } else if let Some(node_types) = df.utf8::(index) { + Ok(Box::new(node_types)) + } else { + Err(GraphError::LoadFailure( + "Unable to convert / find node_type column in dataframe." + .to_string(), + )) + }; + iter_res? + } + None => Box::new(iter::repeat(Some(node_type))), + }, + None => Box::new(iter::repeat(None)), + }; + + if let (Some(node_id), Some(time)) = ( + df.iter_col::(node_id_index), + df.time_iter_col(time_index), + ) { + let iter = node_id + .map(|i| i.copied()) + .zip(time) + .zip(node_type) + .map(|((node_id, time), n_t)| (node_id, time, n_t)); + load_nodes_from_num_iter( + graph, + &mut pb, + iter, + prop_iter, + const_prop_iter, + shared_const_properties, + )?; + } else if let (Some(node_id), Some(time)) = ( + df.iter_col::(node_id_index), + df.time_iter_col(time_index), + ) { + let iter = node_id.map(i64_opt_into_u64_opt).zip(time); + let iter = iter + .zip(node_type) + .map(|((node_id, time), n_t)| (node_id, time, n_t)); + + load_nodes_from_num_iter( + graph, + &mut pb, + iter, + prop_iter, + const_prop_iter, + shared_const_properties, + )?; + } else if let (Some(node_id), Some(time)) = + (df.utf8::(node_id_index), df.time_iter_col(time_index)) + { + let iter = node_id.into_iter().zip(time); + let iter = iter + .zip(node_type) + .map(|((node_id, time), n_t)| (node_id, time, n_t)); + + for (((node_id, time, n_t), props), const_props) in + iter.zip(prop_iter).zip(const_prop_iter) + { + if let (Some(node_id), Some(time), n_t) = (node_id, time, n_t) { + let actual_type = extract_out_default_type(n_t); + let v = graph.add_node(time, node_id, props, actual_type)?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = &shared_const_properties { + v.add_constant_properties(shared_const_props.iter())?; + } } + let _ = pb.update(1); } - } - } else if let (Some(node_id), Some(time)) = (df.utf8::(node_id), df.time_iter_col(time)) { - let iter = node_id.into_iter().zip(time); - let iter = iter - .zip(node_type) - .map(|((node_id, time), n_t)| (node_id, time, n_t)); - - let iter = maybe_tqdm!( - iter.zip(prop_iter).zip(const_prop_iter), - size, - "Loading nodes" - ); - - for (((node_id, time, n_t), props), const_props) in iter { - let actual_type = extract_out_default_type(n_t); - if let (Some(node_id), Some(time), n_t) = (node_id, time, actual_type) { - let v = graph.add_node(time, node_id, props, n_t)?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props)?; + } else if let (Some(node_id), Some(time)) = + (df.utf8::(node_id_index), df.time_iter_col(time_index)) + { + let iter = node_id.into_iter().zip(time); + let iter = iter + .zip(node_type) + .map(|((node_id, time), n_t)| (node_id, time, n_t)); + + for (((node_id, time, n_t), props), const_props) in + iter.zip(prop_iter).zip(const_prop_iter) + { + let actual_type = extract_out_default_type(n_t); + if let (Some(node_id), Some(time), n_t) = (node_id, time, actual_type) { + let v = graph.add_node(time, node_id, props, n_t)?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = shared_const_properties { + v.add_constant_properties(shared_const_props)?; + } } + let _ = pb.update(1); } - } - } else { - return Err(GraphError::LoadFailure( - "node id column must be either u64 or text, time column must be i64. Ensure these contain no NaN, Null or None values.".to_string(), - )); + } else { + return Err(GraphError::LoadFailure( + "node id column must be either u64 or text, time column must be i64. Ensure these contain no NaN, Null or None values.".to_string(), + )); + }; } - Ok(()) } -fn extract_out_default_type(n_t: Option<&str>) -> Option<&str> { - if n_t == Some("_default") { - None - } else { - n_t - } -} - pub(crate) fn load_edges_from_df< 'a, - S: AsRef, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - df: &'a DFView, - size: usize, + df_view: DFView>>, src: &str, dst: &str, time: &str, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, - layer: Option, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, + layer: Option<&str>, layer_in_df: bool, graph: &G, ) -> Result<(), GraphError> { - let (prop_iter, const_prop_iter) = get_prop_rows(df, properties, const_properties)?; - let layer = lift_layer(layer, layer_in_df, df); - - if let (Some(src), Some(dst), Some(time)) = ( - df.iter_col::(src), - df.iter_col::(dst), - df.time_iter_col(time), - ) { - let triplets = src - .map(|i| i.copied()) - .zip(dst.map(|i| i.copied())) - .zip(time); - load_edges_from_num_iter( - graph, - size, - triplets, - prop_iter, - const_prop_iter, - shared_const_properties, - layer, - )?; - } else if let (Some(src), Some(dst), Some(time)) = ( - df.iter_col::(src), - df.iter_col::(dst), - df.time_iter_col(time), - ) { - let triplets = src - .map(i64_opt_into_u64_opt) - .zip(dst.map(i64_opt_into_u64_opt)) - .zip(time); - load_edges_from_num_iter( - graph, - size, - triplets, - prop_iter, - const_prop_iter, - shared_const_properties, - layer, - )?; - } else if let (Some(src), Some(dst), Some(time)) = ( - df.utf8::(src), - df.utf8::(dst), - df.time_iter_col(time), - ) { - let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - - let iter = maybe_tqdm!( - triplets.zip(prop_iter).zip(const_prop_iter).zip(layer), - size, - "Loading edges" - ); - - for (((((src, dst), time), props), const_props), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + let properties = properties.unwrap_or(&[]); + let const_properties = const_properties.unwrap_or(&[]); + + let properties_indices = properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + let const_properties_indices = const_properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + + let src_index = df_view.get_index(src)?; + let dst_index = df_view.get_index(dst)?; + let time_index = df_view.get_index(time)?; + let layer_index = layer + .filter(|_| layer_in_df) + .map(|layer| df_view.get_index(layer.as_ref())) + .transpose()?; + + let mut pb = build_progress_bar("Loading edges".to_string(), df_view.num_rows)?; + + for chunk in df_view.chunks { + let df = chunk?; + let prop_iter = combine_properties(properties, &properties_indices, &df)?; + let const_prop_iter = combine_properties(const_properties, &const_properties_indices, &df)?; + + let layer = lift_layer(layer, layer_index, &df); + + if let (Some(src), Some(dst), Some(time)) = ( + df.iter_col::(src_index), + df.iter_col::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src + .map(|i| i.copied()) + .zip(dst.map(|i| i.copied())) + .zip(time); + load_edges_from_num_iter( + graph, + &mut pb, + triplets, + prop_iter, + const_prop_iter, + shared_const_properties, + layer, + )?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.iter_col::(src_index), + df.iter_col::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src + .map(i64_opt_into_u64_opt) + .zip(dst.map(i64_opt_into_u64_opt)) + .zip(time); + load_edges_from_num_iter( + graph, + &mut pb, + triplets, + prop_iter, + const_prop_iter, + shared_const_properties, + layer, + )?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.utf8::(src_index), + df.utf8::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); + + for (((((src, dst), time), props), const_props), layer) in + triplets.zip(prop_iter).zip(const_prop_iter).zip(layer) + { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst), Some(time)) = ( - df.utf8::(src), - df.utf8::(dst), - df.time_iter_col(time), - ) { - let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - let iter = maybe_tqdm!( - triplets.zip(prop_iter).zip(const_prop_iter).zip(layer), - size, - "Loading edges" - ); - - for (((((src, dst), time), props), const_props), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.utf8::(src_index), + df.utf8::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); + for (((((src, dst), time), props), const_props), layer) in + triplets.zip(prop_iter).zip(const_prop_iter).zip(layer) + { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else { - return Err(GraphError::LoadFailure( - "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." - .to_string(), - )); + } else { + return Err(GraphError::LoadFailure( + "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." + .to_string(), + )); + }; } Ok(()) } pub(crate) fn load_edges_deletions_from_df< 'a, - S: AsRef, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + DeletionOps, >( - df: &'a DFView, - size: usize, + df_view: DFView>>, src: &str, dst: &str, time: &str, - layer: Option, + layer: Option<&str>, layer_in_df: bool, graph: &G, ) -> Result<(), GraphError> { - let layer = lift_layer(layer, layer_in_df, df); - - if let (Some(src), Some(dst), Some(time)) = ( - df.iter_col::(src), - df.iter_col::(dst), - df.time_iter_col(time), - ) { - let triplets = src - .map(|i| i.copied()) - .zip(dst.map(|i| i.copied())) - .zip(time); - - let iter = maybe_tqdm!(triplets.zip(layer), size, "Loading edges"); - - for (((src, dst), time), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - graph.delete_edge(time, src, dst, layer.as_deref())?; + let src_index = df_view.get_index(src)?; + let dst_index = df_view.get_index(dst)?; + let time_index = df_view.get_index(time)?; + let layer_index = layer + .filter(|_| layer_in_df) + .map(|layer| df_view.get_index(layer.as_ref())) + .transpose()?; + + let mut pb = build_progress_bar("Loading edge deletions".to_string(), df_view.num_rows)?; + + for chunk in df_view.chunks { + let df = chunk?; + let layer = lift_layer(layer, layer_index, &df); + + if let (Some(src), Some(dst), Some(time)) = ( + df.iter_col::(src_index), + df.iter_col::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src + .map(|i| i.copied()) + .zip(dst.map(|i| i.copied())) + .zip(time); + + for (((src, dst), time), layer) in triplets.zip(layer) { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + graph.delete_edge(time, src, dst, layer.as_deref())?; + } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst), Some(time)) = ( - df.iter_col::(src), - df.iter_col::(dst), - df.time_iter_col(time), - ) { - let triplets = src - .map(i64_opt_into_u64_opt) - .zip(dst.map(i64_opt_into_u64_opt)) - .zip(time); - - let iter = maybe_tqdm!(triplets.zip(layer), size, "Loading edges"); - - for (((src, dst), time), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - graph.delete_edge(time, src, dst, layer.as_deref())?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.iter_col::(src_index), + df.iter_col::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src + .map(i64_opt_into_u64_opt) + .zip(dst.map(i64_opt_into_u64_opt)) + .zip(time); + + for (((src, dst), time), layer) in triplets.zip(layer) { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + graph.delete_edge(time, src, dst, layer.as_deref())?; + } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst), Some(time)) = ( - df.utf8::(src), - df.utf8::(dst), - df.time_iter_col(time), - ) { - let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - let iter = maybe_tqdm!(triplets.zip(layer), size, "Loading edges"); - - for (((src, dst), time), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - graph.delete_edge(time, src, dst, layer.as_deref())?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.utf8::(src_index), + df.utf8::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); + for (((src, dst), time), layer) in triplets.zip(layer) { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + graph.delete_edge(time, src, dst, layer.as_deref())?; + } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst), Some(time)) = ( - df.utf8::(src), - df.utf8::(dst), - df.time_iter_col(time), - ) { - let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - let iter = maybe_tqdm!(triplets.zip(layer), size, "Loading edges"); - - for (((src, dst), time), layer) in iter { - if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { - graph.delete_edge(time, src, dst, layer.as_deref())?; + } else if let (Some(src), Some(dst), Some(time)) = ( + df.utf8::(src_index), + df.utf8::(dst_index), + df.time_iter_col(time_index), + ) { + let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); + + for (((src, dst), time), layer) in triplets.zip(layer) { + if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { + graph.delete_edge(time, src, dst, layer.as_deref())?; + } + let _ = pb.update(1); } - } - } else { - return Err(GraphError::LoadFailure( - "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." - .to_string(), - )); + } else { + return Err(GraphError::LoadFailure( + "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." + .to_string(), + )); + }; } + Ok(()) } @@ -357,190 +402,203 @@ pub(crate) fn load_node_props_from_df< 'a, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - df: &'a DFView, - size: usize, + df_view: DFView>>, node_id: &str, - const_properties: Option>, - shared_const_properties: Option>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, graph: &G, ) -> Result<(), GraphError> { - let (_, const_prop_iter) = get_prop_rows(df, None, const_properties)?; - - if let Some(node_id) = df.iter_col::(node_id) { - let iter = node_id.map(|i| i.copied()); - let iter = maybe_tqdm!(iter.zip(const_prop_iter), size, "Loading node properties"); - - for (node_id, const_props) in iter { - if let Some(node_id) = node_id { - let v = graph - .node(node_id) - .ok_or(GraphError::NodeIdError(node_id))?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props.iter())?; + let const_properties = const_properties.unwrap_or(&[]); + let const_properties_indices = const_properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + let node_id_index = df_view.get_index(node_id)?; + let mut pb = build_progress_bar("Loading node properties".to_string(), df_view.num_rows)?; + + for chunk in df_view.chunks { + let df = chunk?; + let const_prop_iter = combine_properties(const_properties, &const_properties_indices, &df)?; + + if let Some(node_id) = df.iter_col::(node_id_index) { + let iter = node_id.map(|i| i.copied()); + for (node_id, const_props) in iter.zip(const_prop_iter) { + if let Some(node_id) = node_id { + let v = graph + .node(node_id) + .ok_or(GraphError::NodeIdError(node_id))?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = &shared_const_properties { + v.add_constant_properties(shared_const_props.iter())?; + } } + let _ = pb.update(1); } - } - } else if let Some(node_id) = df.iter_col::(node_id) { - let iter = node_id.map(i64_opt_into_u64_opt); - let iter = maybe_tqdm!(iter.zip(const_prop_iter), size, "Loading node properties"); - - for (node_id, const_props) in iter { - if let Some(node_id) = node_id { - let v = graph - .node(node_id) - .ok_or(GraphError::NodeIdError(node_id))?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props.iter())?; + } else if let Some(node_id) = df.iter_col::(node_id_index) { + let iter = node_id.map(i64_opt_into_u64_opt); + for (node_id, const_props) in iter.zip(const_prop_iter) { + if let Some(node_id) = node_id { + let v = graph + .node(node_id) + .ok_or(GraphError::NodeIdError(node_id))?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = &shared_const_properties { + v.add_constant_properties(shared_const_props.iter())?; + } } + let _ = pb.update(1); } - } - } else if let Some(node_id) = df.utf8::(node_id) { - let iter = node_id.into_iter(); - let iter = maybe_tqdm!(iter.zip(const_prop_iter), size, "Loading node properties"); - - for (node_id, const_props) in iter { - if let Some(node_id) = node_id { - let v = graph - .node(node_id) - .ok_or_else(|| GraphError::NodeNameError(node_id.to_owned()))?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props.iter())?; + } else if let Some(node_id) = df.utf8::(node_id_index) { + let iter = node_id.into_iter(); + for (node_id, const_props) in iter.zip(const_prop_iter) { + if let Some(node_id) = node_id { + let v = graph + .node(node_id) + .ok_or_else(|| GraphError::NodeNameError(node_id.to_owned()))?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = &shared_const_properties { + v.add_constant_properties(shared_const_props.iter())?; + } } + let _ = pb.update(1); } - } - } else if let Some(node_id) = df.utf8::(node_id) { - let iter = node_id.into_iter(); - let iter = maybe_tqdm!(iter.zip(const_prop_iter), size, "Loading node properties"); - - for (node_id, const_props) in iter { - if let Some(node_id) = node_id { - let v = graph - .node(node_id) - .ok_or_else(|| GraphError::NodeNameError(node_id.to_owned()))?; - v.add_constant_properties(const_props)?; - if let Some(shared_const_props) = &shared_const_properties { - v.add_constant_properties(shared_const_props.iter())?; + } else if let Some(node_id) = df.utf8::(node_id_index) { + let iter = node_id.into_iter(); + for (node_id, const_props) in iter.zip(const_prop_iter) { + if let Some(node_id) = node_id { + let v = graph + .node(node_id) + .ok_or_else(|| GraphError::NodeNameError(node_id.to_owned()))?; + v.add_constant_properties(const_props)?; + if let Some(shared_const_props) = &shared_const_properties { + v.add_constant_properties(shared_const_props.iter())?; + } } + let _ = pb.update(1); } - } - } else { - return Err(GraphError::LoadFailure( - "node id column must be either u64 or text, time column must be i64. Ensure these contain no NaN, Null or None values.".to_string(), - )); + } else { + return Err(GraphError::LoadFailure( + "node id column must be either u64 or text, time column must be i64. Ensure these contain no NaN, Null or None values.".to_string(), + )); + }; } Ok(()) } pub(crate) fn load_edges_props_from_df< 'a, - S: AsRef, G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( - df: &'a DFView, - size: usize, + df_view: DFView>>, src: &str, dst: &str, - const_properties: Option>, - shared_const_properties: Option>, - layer: Option, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, + layer: Option<&str>, layer_in_df: bool, graph: &G, ) -> Result<(), GraphError> { - let (_, const_prop_iter) = get_prop_rows(df, None, const_properties)?; - let layer = lift_layer(layer, layer_in_df, df); - - if let (Some(src), Some(dst)) = (df.iter_col::(src), df.iter_col::(dst)) { - let triplets = src.map(|i| i.copied()).zip(dst.map(|i| i.copied())); - let iter = maybe_tqdm!( - triplets.zip(const_prop_iter).zip(layer), - size, - "Loading edge properties" - ); - - for (((src, dst), const_props), layer) in iter { - if let (Some(src), Some(dst)) = (src, dst) { - let e = graph - .edge(src, dst) - .ok_or(GraphError::EdgeIdError { src, dst })?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + let const_properties = const_properties.unwrap_or(&[]); + let const_properties_indices = const_properties + .iter() + .map(|name| df_view.get_index(name)) + .collect::, GraphError>>()?; + let src_index = df_view.get_index(src)?; + let dst_index = df_view.get_index(dst)?; + let layer_index = layer + .filter(|_| layer_in_df) + .map(|layer| df_view.get_index(layer.as_ref())) + .transpose()?; + + let mut pb = build_progress_bar("Loading edge properties".to_string(), df_view.num_rows)?; + + for chunk in df_view.chunks { + let df = chunk?; + let const_prop_iter = combine_properties(const_properties, &const_properties_indices, &df)?; + + let layer = lift_layer(layer, layer_index, &df); + + if let (Some(src), Some(dst)) = + (df.iter_col::(src_index), df.iter_col::(dst_index)) + { + let triplets = src.map(|i| i.copied()).zip(dst.map(|i| i.copied())); + + for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + if let (Some(src), Some(dst)) = (src, dst) { + let e = graph + .edge(src, dst) + .ok_or(GraphError::EdgeIdError { src, dst })?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst)) = (df.iter_col::(src), df.iter_col::(dst)) { - let triplets = src - .map(i64_opt_into_u64_opt) - .zip(dst.map(i64_opt_into_u64_opt)); - let iter = maybe_tqdm!( - triplets.zip(const_prop_iter).zip(layer), - size, - "Loading edge properties" - ); - - for (((src, dst), const_props), layer) in iter { - if let (Some(src), Some(dst)) = (src, dst) { - let e = graph - .edge(src, dst) - .ok_or(GraphError::EdgeIdError { src, dst })?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } else if let (Some(src), Some(dst)) = + (df.iter_col::(src_index), df.iter_col::(dst_index)) + { + let triplets = src + .map(i64_opt_into_u64_opt) + .zip(dst.map(i64_opt_into_u64_opt)); + + for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + if let (Some(src), Some(dst)) = (src, dst) { + let e = graph + .edge(src, dst) + .ok_or(GraphError::EdgeIdError { src, dst })?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst)) = (df.utf8::(src), df.utf8::(dst)) { - let triplets = src.into_iter().zip(dst.into_iter()); - let iter = maybe_tqdm!( - triplets.zip(const_prop_iter).zip(layer), - size, - "Loading edge properties" - ); - - for (((src, dst), const_props), layer) in iter { - if let (Some(src), Some(dst)) = (src, dst) { - let e = graph - .edge(src, dst) - .ok_or_else(|| GraphError::EdgeNameError { - src: src.to_owned(), - dst: dst.to_owned(), - })?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } else if let (Some(src), Some(dst)) = + (df.utf8::(src_index), df.utf8::(dst_index)) + { + let triplets = src.into_iter().zip(dst.into_iter()); + for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + if let (Some(src), Some(dst)) = (src, dst) { + let e = graph + .edge(src, dst) + .ok_or_else(|| GraphError::EdgeNameError { + src: src.to_owned(), + dst: dst.to_owned(), + })?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else if let (Some(src), Some(dst)) = (df.utf8::(src), df.utf8::(dst)) { - let triplets = src.into_iter().zip(dst.into_iter()); - let iter = maybe_tqdm!( - triplets.zip(const_prop_iter).zip(layer), - size, - "Loading edge properties" - ); - - for (((src, dst), const_props), layer) in iter { - if let (Some(src), Some(dst)) = (src, dst) { - let e = graph - .edge(src, dst) - .ok_or_else(|| GraphError::EdgeNameError { - src: src.to_owned(), - dst: dst.to_owned(), - })?; - e.add_constant_properties(const_props, layer.as_deref())?; - if let Some(shared_const_props) = &shared_const_properties { - e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } else if let (Some(src), Some(dst)) = + (df.utf8::(src_index), df.utf8::(dst_index)) + { + let triplets = src.into_iter().zip(dst.into_iter()); + + for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + if let (Some(src), Some(dst)) = (src, dst) { + let e = graph + .edge(src, dst) + .ok_or_else(|| GraphError::EdgeNameError { + src: src.to_owned(), + dst: dst.to_owned(), + })?; + e.add_constant_properties(const_props, layer.as_deref())?; + if let Some(shared_const_props) = &shared_const_properties { + e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; + } } + let _ = pb.update(1); } - } - } else { - return Err(GraphError::LoadFailure( - "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." - .to_string(), - )); + } else { + return Err(GraphError::LoadFailure( + "Source and Target columns must be either u64 or text, Time column must be i64. Ensure these contain no NaN, Null or None values." + .to_string(), + )); + }; } Ok(()) } @@ -558,19 +616,16 @@ fn load_edges_from_num_iter< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( graph: &G, - size: usize, + pb: &mut Bar, edges: I, properties: PI, const_properties: PI, - shared_const_properties: Option>, + shared_const_properties: Option<&HashMap>, layer: IL, ) -> Result<(), GraphError> { - let iter = maybe_tqdm!( - edges.zip(properties).zip(const_properties).zip(layer), - size, - "Loading edges" - ); - for (((((src, dst), time), edge_props), const_props), layer) in iter { + for (((((src, dst), time), edge_props), const_props), layer) in + edges.zip(properties).zip(const_properties).zip(layer) + { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { let e = graph.add_edge(time, src, dst, edge_props, layer.as_deref())?; e.add_constant_properties(const_props, layer.as_deref())?; @@ -578,6 +633,7 @@ fn load_edges_from_num_iter< e.add_constant_properties(shared_const_props.iter(), layer.as_deref())?; } } + let _ = pb.update(1); } Ok(()) } @@ -590,18 +646,15 @@ fn load_nodes_from_num_iter< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( graph: &G, - size: usize, + pb: &mut Bar, nodes: I, properties: PI, const_properties: PI, - shared_const_properties: Option>, + shared_const_properties: Option<&HashMap>, ) -> Result<(), GraphError> { - let iter = maybe_tqdm!( - nodes.zip(properties).zip(const_properties), - size, - "Loading nodes" - ); - for (((node, time, node_type), props), const_props) in iter { + for (((node, time, node_type), props), const_props) in + nodes.zip(properties).zip(const_properties) + { if let (Some(v), Some(t), n_t, props, const_props) = (node, time, node_type, props, const_props) { @@ -612,6 +665,7 @@ fn load_nodes_from_num_iter< if let Some(shared_const_props) = &shared_const_properties { v.add_constant_properties(shared_const_props.iter())?; } + let _ = pb.update(1); } } Ok(()) diff --git a/raphtory/src/io/arrow/mod.rs b/raphtory/src/io/arrow/mod.rs index 5ba8b9054..0bd4078ce 100644 --- a/raphtory/src/io/arrow/mod.rs +++ b/raphtory/src/io/arrow/mod.rs @@ -5,7 +5,10 @@ mod prop_handler; #[cfg(test)] mod test { use crate::{ - io::arrow::{dataframe::DFView, df_loaders::*}, + io::arrow::{ + dataframe::{DFChunk, DFView}, + df_loaders::*, + }, prelude::*, }; use polars_arrow::array::{PrimitiveArray, Utf8Array}; @@ -18,33 +21,38 @@ mod test { .iter() .map(|s| s.to_string()) .collect(), - arrays: vec![ - vec![ - Box::new(PrimitiveArray::::from(vec![Some(1)])), - Box::new(PrimitiveArray::::from(vec![Some(2)])), - Box::new(PrimitiveArray::::from(vec![Some(1)])), - Box::new(PrimitiveArray::::from(vec![Some(1.0)])), - Box::new(Utf8Array::::from(vec![Some("a")])), - ], - vec![ - Box::new(PrimitiveArray::::from(vec![Some(2), Some(3)])), - Box::new(PrimitiveArray::::from(vec![Some(3), Some(4)])), - Box::new(PrimitiveArray::::from(vec![Some(2), Some(3)])), - Box::new(PrimitiveArray::::from(vec![Some(2.0), Some(3.0)])), - Box::new(Utf8Array::::from(vec![Some("b"), Some("c")])), - ], - ], + chunks: vec![ + Ok(DFChunk { + chunk: vec![ + Box::new(PrimitiveArray::::from(vec![Some(1)])), + Box::new(PrimitiveArray::::from(vec![Some(2)])), + Box::new(PrimitiveArray::::from(vec![Some(1)])), + Box::new(PrimitiveArray::::from(vec![Some(1.0)])), + Box::new(Utf8Array::::from(vec![Some("a")])), + ], + }), + Ok(DFChunk { + chunk: vec![ + Box::new(PrimitiveArray::::from(vec![Some(2), Some(3)])), + Box::new(PrimitiveArray::::from(vec![Some(3), Some(4)])), + Box::new(PrimitiveArray::::from(vec![Some(2), Some(3)])), + Box::new(PrimitiveArray::::from(vec![Some(2.0), Some(3.0)])), + Box::new(Utf8Array::::from(vec![Some("b"), Some("c")])), + ], + }), + ] + .into_iter(), + num_rows: 3, }; let graph = Graph::new(); let layer: Option<&str> = None; let layer_in_df: bool = true; load_edges_from_df( - &df, - 5, + df, "src", "dst", "time", - Some(vec!["prop1", "prop2"]), + Some(&*vec!["prop1", "prop2"]), None, None, layer, @@ -108,29 +116,34 @@ mod test { .iter() .map(|s| s.to_string()) .collect(), - arrays: vec![ - vec![ - Box::new(PrimitiveArray::::from(vec![Some(1)])), - Box::new(Utf8Array::::from(vec![Some("a")])), - Box::new(PrimitiveArray::::from(vec![Some(1)])), - Box::new(Utf8Array::::from(vec![Some("atype")])), - ], - vec![ - Box::new(PrimitiveArray::::from(vec![Some(2)])), - Box::new(Utf8Array::::from(vec![Some("b")])), - Box::new(PrimitiveArray::::from(vec![Some(2)])), - Box::new(Utf8Array::::from(vec![Some("btype")])), - ], - ], + chunks: vec![ + Ok(DFChunk { + chunk: vec![ + Box::new(PrimitiveArray::::from(vec![Some(1)])), + Box::new(Utf8Array::::from(vec![Some("a")])), + Box::new(PrimitiveArray::::from(vec![Some(1)])), + Box::new(Utf8Array::::from(vec![Some("atype")])), + ], + }), + Ok(DFChunk { + chunk: vec![ + Box::new(PrimitiveArray::::from(vec![Some(2)])), + Box::new(Utf8Array::::from(vec![Some("b")])), + Box::new(PrimitiveArray::::from(vec![Some(2)])), + Box::new(Utf8Array::::from(vec![Some("btype")])), + ], + }), + ] + .into_iter(), + num_rows: 2, }; let graph = Graph::new(); load_nodes_from_df( - &df, - 3, + df, "id", "time", - Some(vec!["name"]), + Some(&*vec!["name"]), None, None, Some("node_type"), diff --git a/raphtory/src/io/arrow/prop_handler.rs b/raphtory/src/io/arrow/prop_handler.rs index c3d07979c..acbda631d 100644 --- a/raphtory/src/io/arrow/prop_handler.rs +++ b/raphtory/src/io/arrow/prop_handler.rs @@ -6,51 +6,45 @@ use polars_arrow::{ use crate::{ core::{utils::errors::GraphError, IntoPropList}, - io::arrow::dataframe::DFView, + io::arrow::dataframe::DFChunk, prelude::Prop, }; pub struct PropIter<'a> { - inner: Box> + 'a>, + inner: Vec> + 'a>>, } impl<'a> Iterator for PropIter<'a> { type Item = Vec<(&'a str, Prop)>; fn next(&mut self) -> Option { - self.inner.next() + self.inner + .iter_mut() + .map(|v| v.next()) + .filter_map(|r| match r { + Some(r1) => match r1 { + Some(r2) => Some(Some(r2)), + None => None, + }, + None => Some(None), + }) + .collect() } } -pub(crate) fn get_prop_rows<'a>( - df: &'a DFView, - props: Option>, - const_props: Option>, -) -> Result<(PropIter<'a>, PropIter<'a>), GraphError> { - let prop_iter = combine_properties(props, df)?; - let const_prop_iter = combine_properties(const_props, df)?; - Ok((prop_iter, const_prop_iter)) -} - -fn combine_properties<'a>( - props: Option>, - df: &'a DFView, +pub(crate) fn combine_properties<'a>( + props: &'a [&str], + indices: &'a [usize], + df: &'a DFChunk, ) -> Result, GraphError> { - let iter = props - .unwrap_or_default() - .into_iter() - .map(|name| lift_property(name, df)) - .reduce(|i1, i2| { - let i1 = i1?; - let i2 = i2?; - Ok(Box::new(i1.zip(i2).map(|(mut v1, v2)| { - v1.extend(v2); - v1 - }))) - }) - .unwrap_or_else(|| Ok(Box::new(std::iter::repeat(vec![])))); - - Ok(PropIter { inner: iter? }) + for idx in indices { + is_data_type_supported(df.chunk[*idx].data_type())?; + } + let zipped = props.iter().zip(indices.iter()); + let iter = zipped.map(|(name, idx)| lift_property(*idx, name, df)); + Ok(PropIter { + inner: iter.collect(), + }) } fn arr_as_prop(arr: Box) -> Prop { @@ -124,7 +118,7 @@ fn arr_as_prop(arr: Box) -> Prop { } } -fn validate_data_types(dt: &DataType) -> Result<(), GraphError> { +fn is_data_type_supported(dt: &DataType) -> Result<(), GraphError> { match dt { DataType::Boolean => {} DataType::Int32 => {} @@ -137,9 +131,9 @@ fn validate_data_types(dt: &DataType) -> Result<(), GraphError> { DataType::Float64 => {} DataType::Utf8 => {} DataType::LargeUtf8 => {} - DataType::List(v) => validate_data_types(v.data_type())?, - DataType::FixedSizeList(v, _) => validate_data_types(v.data_type())?, - DataType::LargeList(v) => validate_data_types(v.data_type())?, + DataType::List(v) => is_data_type_supported(v.data_type())?, + DataType::FixedSizeList(v, _) => is_data_type_supported(v.data_type())?, + DataType::LargeList(v) => is_data_type_supported(v.data_type())?, DataType::Timestamp(_, _) => {} _ => Err(GraphError::UnsupportedDataType)?, } @@ -147,255 +141,224 @@ fn validate_data_types(dt: &DataType) -> Result<(), GraphError> { } pub(crate) fn lift_property<'a: 'b, 'b>( + idx: usize, name: &'a str, - df: &'b DFView, -) -> Result> + 'b>, GraphError> { - let idx = df - .names - .iter() - .position(|n| n == name) - .ok_or_else(|| GraphError::ColumnDoesNotExist(name.to_string()))?; - - if let Some(first_chunk) = df.arrays.get(0) { - validate_data_types(first_chunk[idx].data_type())?; - } - - let r = df.arrays.iter().flat_map(move |arr| { - let arr: &Box = &arr[idx]; - match arr.data_type() { - DataType::Boolean => { - let arr = arr.as_any().downcast_ref::().unwrap(); - iter_as_prop(name, arr.iter()) - } - DataType::Int32 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::Int64 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::UInt8 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::UInt16 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::UInt32 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::UInt64 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::Float32 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::Float64 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter().map(|i| i.copied())) - } - DataType::Utf8 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter()) - } - DataType::LargeUtf8 => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_prop(name, arr.iter()) - } - DataType::List(_) => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_arr_prop(name, arr.iter()) - } - DataType::FixedSizeList(_, _) => { - let arr = arr.as_any().downcast_ref::().unwrap(); - iter_as_arr_prop(name, arr.iter()) - } - DataType::LargeList(_) => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - iter_as_arr_prop(name, arr.iter()) - } - DataType::Timestamp(timeunit, timezone) => { - let arr = arr.as_any().downcast_ref::>().unwrap(); - match timezone { - Some(_) => match timeunit { - TimeUnit::Second => { - println!("Timestamp(Second, Some({:?})); ", timezone); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::DTime( - DateTime::::from_timestamp(*v, 0) - .expect("DateTime conversion failed"), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Millisecond => { - println!("Timestamp(Millisecond, Some({:?})); ", timezone); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::DTime( - DateTime::::from_timestamp_millis(*v) - .expect("DateTime conversion failed"), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Microsecond => { - println!("Timestamp(Microsecond, Some({:?})); ", timezone); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::DTime( - DateTime::::from_timestamp_micros(*v) - .expect("DateTime conversion failed"), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Nanosecond => { - println!("Timestamp(Nanosecond, Some({:?})); ", timezone); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::DTime(DateTime::::from_timestamp_nanos( - *v, - )), - ) - }) - .collect::>() - })); - r - } - }, - None => match timeunit { - TimeUnit::Second => { - println!("Timestamp(Second, None); "); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::NDTime( - DateTime::from_timestamp(*v, 0) - .expect("DateTime conversion failed") - .naive_utc(), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Millisecond => { - println!("Timestamp(Millisecond, None); "); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::NDTime( - DateTime::from_timestamp_millis(*v) - .expect("DateTime conversion failed") - .naive_utc(), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Microsecond => { - println!("Timestamp(Microsecond, None); "); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::NDTime( - DateTime::from_timestamp_micros(*v) - .expect("DateTime conversion failed") - .naive_utc(), - ), - ) - }) - .collect::>() - })); - r - } - TimeUnit::Nanosecond => { - println!("Timestamp(Nanosecond, None); "); - let r: Box> + 'b> = - Box::new(arr.iter().map(move |val| { - val.into_iter() - .map(|v| { - ( - name, - Prop::NDTime( - DateTime::from_timestamp_nanos(*v).naive_utc(), - ), - ) - }) - .collect::>() - })); - r - } - }, - } + df: &'b DFChunk, +) -> Box> + 'b> { + let arr = &df.chunk[idx]; + let r = match arr.data_type() { + DataType::Boolean => { + let arr = arr.as_any().downcast_ref::().unwrap(); + iter_as_prop(name, arr.iter()) + } + DataType::Int32 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::Int64 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::UInt8 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::UInt16 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::UInt32 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::UInt64 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::Float32 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::Float64 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter().map(|i| i.copied())) + } + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter()) + } + DataType::LargeUtf8 => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_prop(name, arr.iter()) + } + DataType::List(_) => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_arr_prop(name, arr.iter()) + } + DataType::FixedSizeList(_, _) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + iter_as_arr_prop(name, arr.iter()) + } + DataType::LargeList(_) => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + iter_as_arr_prop(name, arr.iter()) + } + DataType::Timestamp(timeunit, timezone) => { + let arr = arr.as_any().downcast_ref::>().unwrap(); + match timezone { + Some(_) => match timeunit { + TimeUnit::Second => { + println!("Timestamp(Second, Some({:?})); ", timezone); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::DTime( + DateTime::::from_timestamp(*v, 0) + .expect("DateTime conversion failed"), + ), + ) + }) + })); + r + } + TimeUnit::Millisecond => { + println!("Timestamp(Millisecond, Some({:?})); ", timezone); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::DTime( + DateTime::::from_timestamp_millis(*v) + .expect("DateTime conversion failed"), + ), + ) + }) + })); + r + } + TimeUnit::Microsecond => { + println!("Timestamp(Microsecond, Some({:?})); ", timezone); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::DTime( + DateTime::::from_timestamp_micros(*v) + .expect("DateTime conversion failed"), + ), + ) + }) + })); + r + } + TimeUnit::Nanosecond => { + println!("Timestamp(Nanosecond, Some({:?})); ", timezone); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + (name, Prop::DTime(DateTime::::from_timestamp_nanos(*v))) + }) + })); + r + } + }, + None => match timeunit { + TimeUnit::Second => { + println!("Timestamp(Second, None); "); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::NDTime( + DateTime::from_timestamp(*v, 0) + .expect("DateTime conversion failed") + .naive_utc(), + ), + ) + }) + })); + r + } + TimeUnit::Millisecond => { + println!("Timestamp(Millisecond, None); "); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::NDTime( + DateTime::from_timestamp_millis(*v) + .expect("DateTime conversion failed") + .naive_utc(), + ), + ) + }) + })); + r + } + TimeUnit::Microsecond => { + println!("Timestamp(Microsecond, None); "); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::NDTime( + DateTime::from_timestamp_micros(*v) + .expect("DateTime conversion failed") + .naive_utc(), + ), + ) + }) + })); + r + } + TimeUnit::Nanosecond => { + println!("Timestamp(Nanosecond, None); "); + let r: Box> + 'b> = + Box::new(arr.iter().map(move |val| { + val.map(|v| { + ( + name, + Prop::NDTime( + DateTime::from_timestamp_nanos(*v).naive_utc(), + ), + ) + }) + })); + r + } + }, } - unsupported => panic!("Data type not supported: {:?}", unsupported), } - }); + unsupported => panic!("Data type not supported: {:?}", unsupported), + }; - Ok(Box::new(r)) + r } -pub(crate) fn lift_layer<'a, S: AsRef>( - layer: Option, - layer_in_df: bool, - df: &'a DFView, +pub(crate) fn lift_layer<'a>( + layer: Option<&str>, + layer_index: Option, + df: &'a DFChunk, ) -> Box> + 'a> { if let Some(layer) = layer { - if layer_in_df { - if let Some(col) = df.utf8::(layer.as_ref()) { - Box::new(col.map(|v| v.map(|v| v.to_string()))) - } else if let Some(col) = df.utf8::(layer.as_ref()) { - Box::new(col.map(|v| v.map(|v| v.to_string()))) - } else { - Box::new(std::iter::repeat(None)) + match layer_index { + Some(index) => { + if let Some(col) = df.utf8::(index) { + Box::new(col.map(|v| v.map(|v| v.to_string()))) + } else if let Some(col) = df.utf8::(index) { + Box::new(col.map(|v| v.map(|v| v.to_string()))) + } else { + Box::new(std::iter::repeat(None)) + } } - } else { - Box::new(std::iter::repeat(Some(layer.as_ref().to_string()))) + None => Box::new(std::iter::repeat(Some(layer.to_string()))), } } else { Box::new(std::iter::repeat(None)) @@ -405,21 +368,13 @@ pub(crate) fn lift_layer<'a, S: AsRef>( fn iter_as_prop<'a, T: Into + 'a, I: Iterator> + 'a>( name: &'a str, is: I, -) -> Box> + 'a> { - Box::new(is.map(move |val| { - val.into_iter() - .map(|v| (name, (v).into())) - .collect::>() - })) +) -> Box> + 'a> { + Box::new(is.map(move |val| val.map(|v| (name, v.into())))) } fn iter_as_arr_prop<'a, I: Iterator>> + 'a>( name: &'a str, is: I, -) -> Box> + 'a> { - Box::new(is.map(move |val| { - val.into_iter() - .map(|v| (name, arr_as_prop(v))) - .collect::>() - })) +) -> Box> + 'a> { + Box::new(is.map(move |val| val.map(|v| (name, arr_as_prop(v))))) } diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index ff27f45da..1a80e00aa 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -11,12 +11,7 @@ use crate::{ prelude::DeletionOps, }; use itertools::Itertools; -use polars_arrow::{ - array::Array, - datatypes::{ArrowDataType as DataType, ArrowSchema, Field}, - legacy::error, - record_batch::RecordBatch as Chunk, -}; +use polars_arrow::datatypes::{ArrowDataType as DataType, ArrowSchema, Field}; use polars_parquet::{ read, read::{read_metadata, FileMetaData, FileReader}, @@ -24,6 +19,7 @@ use polars_parquet::{ use std::{ collections::HashMap, fs, + fs::File, path::{Path, PathBuf}, }; @@ -36,13 +32,13 @@ pub fn load_nodes_from_parquet< time: &str, node_type: Option<&str>, node_type_in_df: Option, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, ) -> Result<(), GraphError> { let mut cols_to_check = vec![id, time]; - cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(properties.unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); if node_type_in_df.unwrap_or(true) { if let Some(ref node_type) = node_type { cols_to_check.push(node_type.as_ref()); @@ -50,17 +46,15 @@ pub fn load_nodes_from_parquet< } for path in get_parquet_file_paths(parquet_path)? { - let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - let size = df.get_inner_size(); + let df_view = process_parquet_file_to_df(path.as_path(), &cols_to_check)?; + df_view.check_cols_exist(&cols_to_check)?; load_nodes_from_df( - &df, - size, + df_view, id, time, - properties.clone(), - const_properties.clone(), - shared_const_properties.clone(), + properties, + const_properties, + shared_const_properties, node_type, node_type_in_df.unwrap_or(true), graph, @@ -79,16 +73,16 @@ pub fn load_edges_from_parquet< src: &str, dst: &str, time: &str, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { let parquet_path = parquet_path.as_ref(); let mut cols_to_check = vec![src, dst, time]; - cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(properties.unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); if layer_in_df.unwrap_or(false) { if let Some(ref layer) = layer { cols_to_check.push(layer.as_ref()); @@ -96,18 +90,16 @@ pub fn load_edges_from_parquet< } for path in get_parquet_file_paths(parquet_path)? { - let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - let size = cols_to_check.len(); + let df_view = process_parquet_file_to_df(path.as_path(), &cols_to_check)?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_from_df( - &df, - size, + df_view, src, dst, time, - properties.clone(), - const_properties.clone(), - shared_const_properties.clone(), + properties, + const_properties, + shared_const_properties, layer, layer_in_df.unwrap_or(true), graph, @@ -124,22 +116,21 @@ pub fn load_node_props_from_parquet< graph: &G, parquet_path: &Path, id: &str, - const_properties: Option>, - shared_const_properties: Option>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, ) -> Result<(), GraphError> { let mut cols_to_check = vec![id]; - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); for path in get_parquet_file_paths(parquet_path)? { - let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - let size = cols_to_check.len(); + let df_view = process_parquet_file_to_df(path.as_path(), &cols_to_check)?; + df_view.check_cols_exist(&cols_to_check)?; + load_node_props_from_df( - &df, - size, + df_view, id, - const_properties.clone(), - shared_const_properties.clone(), + const_properties, + shared_const_properties, graph, ) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; @@ -155,8 +146,8 @@ pub fn load_edge_props_from_parquet< parquet_path: &Path, src: &str, dst: &str, - const_properties: Option>, - shared_const_properties: Option>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { @@ -166,19 +157,17 @@ pub fn load_edge_props_from_parquet< cols_to_check.push(layer.as_ref()); } } - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); for path in get_parquet_file_paths(parquet_path)? { - let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - let size = cols_to_check.len(); + let df_view = process_parquet_file_to_df(path.as_path(), &cols_to_check)?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_props_from_df( - &df, - size, + df_view, src, dst, - const_properties.clone(), - shared_const_properties.clone(), + const_properties, + shared_const_properties, layer, layer_in_df.unwrap_or(true), graph.core_graph(), @@ -206,14 +195,11 @@ pub fn load_edges_deletions_from_parquet< cols_to_check.push(layer.as_ref()); } } - for path in get_parquet_file_paths(parquet_path)? { - let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - let size = cols_to_check.len(); + let df_view = process_parquet_file_to_df(path.as_path(), &cols_to_check)?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_deletions_from_df( - &df, - size, + df_view, src, dst, time, @@ -223,38 +209,42 @@ pub fn load_edges_deletions_from_parquet< ) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; } - Ok(()) } pub(crate) fn process_parquet_file_to_df( parquet_file_path: &Path, - col_names: Vec<&str>, -) -> Result { - let (names, arrays) = read_parquet_file(parquet_file_path, &col_names)?; + col_names: &[&str], +) -> Result>>, GraphError> { + let (names, chunks, num_rows) = read_parquet_file(parquet_file_path, col_names)?; - let names = names + let names: Vec = names .into_iter() .filter(|x| col_names.contains(&x.as_str())) .collect(); - let arrays = arrays - .map_ok(|r| r.into_iter().map(|boxed| boxed.clone()).collect_vec()) - .collect::, _>>()?; - Ok(DFView { names, arrays }) + let chunks = chunks.into_iter().map(move |result| { + result + .map(|r| DFChunk { + chunk: r.into_iter().map(|boxed| boxed.clone()).collect_vec(), + }) + .map_err(|e| { + GraphError::LoadFailure(format!("Failed to process Parquet file: {:?}", e)) + }) + }); + + Ok(DFView { + names, + chunks, + num_rows, + }) } fn read_parquet_file( path: impl AsRef, - col_names: &Vec<&str>, -) -> Result< - ( - Vec, - impl Iterator>, error::PolarsError>>, - ), - GraphError, -> { - let read_schema = |metadata: &FileMetaData| -> Result { + col_names: &[&str], +) -> Result<(Vec, FileReader, usize), GraphError> { + let read_schema = |metadata: &FileMetaData| -> Result<(ArrowSchema, usize), GraphError> { let schema = read::infer_schema(metadata)?; let fields = schema .fields @@ -272,20 +262,23 @@ fn read_parquet_file( }) .collect::>(); - Ok(ArrowSchema::from(fields).with_metadata(schema.metadata)) + Ok(( + ArrowSchema::from(fields).with_metadata(schema.metadata), + metadata.num_rows, + )) }; let mut file = std::fs::File::open(&path)?; let metadata = read_metadata(&mut file)?; let row_groups = metadata.clone().row_groups; - let schema = read_schema(&metadata)?; + let (schema, num_rows) = read_schema(&metadata)?; // Although fields are already filtered by col_names, we need names in the order as it appears // in the schema to create PretendDF let names = schema.fields.iter().map(|f| f.name.clone()).collect_vec(); let reader = FileReader::new(file, row_groups, schema, None, None, None); - Ok((names, reader)) + Ok((names, reader, num_rows)) } fn get_parquet_file_paths(parquet_path: &Path) -> Result, GraphError> { @@ -312,7 +305,7 @@ fn get_parquet_file_paths(parquet_path: &Path) -> Result, GraphErro #[cfg(test)] mod test { use super::*; - use polars_arrow::array::{PrimitiveArray, Utf8Array}; + use polars_arrow::array::{Array, PrimitiveArray, Utf8Array}; use std::path::PathBuf; #[test] @@ -320,28 +313,33 @@ mod test { let parquet_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources/test/test_data.parquet"); - let col_names = vec!["src", "dst", "time", "weight", "marbles"]; + let col_names: &[&str] = &["src", "dst", "time", "weight", "marbles"]; let df = process_parquet_file_to_df(parquet_file_path.as_path(), col_names).unwrap(); - let df1 = DFView { - names: vec!["src", "dst", "time", "weight", "marbles"] - .iter() - .map(|s| s.to_string()) - .collect(), - arrays: vec![vec![ - Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), - Box::new(PrimitiveArray::::from_values(vec![2, 3, 4, 5, 6])), - Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), - Box::new(PrimitiveArray::::from_values(vec![ - 1f64, 2f64, 3f64, 4f64, 5f64, - ])), - Box::new(Utf8Array::::from_iter_values( - vec!["red", "blue", "green", "yellow", "purple"].into_iter(), - )), - ]], - }; + let expected_names: Vec = vec!["src", "dst", "time", "weight", "marbles"] + .iter() + .map(|s| s.to_string()) + .collect(); + let expected_chunks: Vec>> = vec![vec![ + Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), + Box::new(PrimitiveArray::::from_values(vec![2, 3, 4, 5, 6])), + Box::new(PrimitiveArray::::from_values(vec![1, 2, 3, 4, 5])), + Box::new(PrimitiveArray::::from_values(vec![ + 1f64, 2f64, 3f64, 4f64, 5f64, + ])), + Box::new(Utf8Array::::from_iter_values( + vec!["red", "blue", "green", "yellow", "purple"].into_iter(), + )), + ]]; + + let actual_names = df.names; + let chunks: Vec> = df.chunks.collect_vec(); + let chunks: Result, GraphError> = chunks.into_iter().collect(); + let chunks: Vec = chunks.unwrap(); + let actual_chunks: Vec>> = + chunks.into_iter().map(|c: DFChunk| c.chunk).collect_vec(); - assert_eq!(df.names, df1.names); - assert_eq!(df.arrays, df1.arrays); + assert_eq!(actual_names, expected_names); + assert_eq!(actual_chunks, expected_chunks); } } diff --git a/raphtory/src/python/graph/disk_graph.rs b/raphtory/src/python/graph/disk_graph.rs index c5e6426b3..9d66f6eba 100644 --- a/raphtory/src/python/graph/disk_graph.rs +++ b/raphtory/src/python/graph/disk_graph.rs @@ -7,7 +7,7 @@ use crate::{ core::utils::errors::GraphError, db::graph::views::deletion_graph::PersistentGraph, disk_graph::{graph_impl::ParquetLayerCols, DiskGraphError, DiskGraphStorage}, - io::arrow::dataframe::DFView, + io::arrow::dataframe::{DFChunk, DFView}, prelude::Graph, python::{ graph::graph::PyGraph, types::repr::StructReprBuilder, utils::errors::adapt_err_value, @@ -148,18 +148,17 @@ impl PyDiskGraph { dst_col: &str, time_col: &str, ) -> Result { - let graph: Result = Python::with_gil(|py| { + let graph: Result = Python::with_gil(|py| { let cols_to_check = vec![src_col, dst_col, time_col]; let df_columns: Vec = edge_df.getattr("columns")?.extract()?; let df_columns: Vec<&str> = df_columns.iter().map(|x| x.as_str()).collect(); - let df = process_pandas_py_df(edge_df, py, df_columns)?; + let df_view = process_pandas_py_df(edge_df, py, df_columns)?; + df_view.check_cols_exist(&cols_to_check)?; + let graph = Self::from_pandas(graph_dir, df_view, src_col, dst_col, time_col)?; - df.check_cols_exist(&cols_to_check)?; - let graph = Self::from_pandas(graph_dir, df, src_col, dst_col, time_col)?; - - Ok::<_, PyErr>(graph) + Ok::<_, GraphError>(graph) }); graph.map_err(|e| { @@ -177,7 +176,9 @@ impl PyDiskGraph { } #[staticmethod] - #[pyo3(signature = (graph_dir, layer_parquet_cols, node_properties, chunk_size, t_props_chunk_size, read_chunk_size, concurrent_files, num_threads, node_type_col))] + #[pyo3( + signature = (graph_dir, layer_parquet_cols, node_properties, chunk_size, t_props_chunk_size, read_chunk_size, concurrent_files, num_threads, node_type_col) + )] fn load_from_parquets( graph_dir: &str, layer_parquet_cols: ParquetLayerColsList, @@ -231,49 +232,52 @@ impl PyDiskGraph { impl PyDiskGraph { fn from_pandas( graph_dir: &str, - df: DFView, + df_view: DFView>>, src: &str, dst: &str, time: &str, ) -> Result { - let src_col_idx = df.names.iter().position(|x| x == src).unwrap(); - let dst_col_idx = df.names.iter().position(|x| x == dst).unwrap(); - let time_col_idx = df.names.iter().position(|x| x == time).unwrap(); - - let chunk_size = df - .arrays - .first() - .map(|arr| arr.len()) - .ok_or_else(|| GraphError::LoadFailure("Empty pandas dataframe".to_owned()))?; - - let t_props_chunk_size = chunk_size; - - let names = df.names.clone(); - - let edge_lists = df - .arrays - .into_iter() - .map(|arr| { - let fields = arr + let src_index = df_view.get_index(src)?; + let dst_index = df_view.get_index(dst)?; + let time_index = df_view.get_index(time)?; + + let mut chunks_iter = df_view.chunks.peekable(); + let chunk_size = if let Some(result) = chunks_iter.peek() { + match result { + Ok(df) => df.chunk.len(), + Err(e) => { + return Err(GraphError::LoadFailure(format!( + "Failed to load graph {e:?}" + ))) + } + } + } else { + return Err(GraphError::LoadFailure("No chunks available".to_string())); + }; + + let edge_lists = chunks_iter + .map_ok(|df| { + let fields = df + .chunk .iter() - .zip(names.iter()) + .zip(df_view.names.iter()) .map(|(arr, col_name)| { Field::new(col_name, arr.data_type().clone(), arr.null_count() > 0) }) .collect_vec(); - let s_array = StructArray::new(DataType::Struct(fields), arr, None); + let s_array = StructArray::new(DataType::Struct(fields), df.chunk, None); s_array }) - .collect::>(); + .collect::, GraphError>>()?; DiskGraphStorage::load_from_edge_lists( &edge_lists, chunk_size, - t_props_chunk_size, + chunk_size, graph_dir, - src_col_idx, - dst_col_idx, - time_col_idx, + src_index, + dst_index, + time_index, ) .map_err(|err| GraphError::LoadFailure(format!("Failed to load graph {err:?}"))) } diff --git a/raphtory/src/python/graph/graph.rs b/raphtory/src/python/graph/graph.rs index 41fd11e1b..6828eacfa 100644 --- a/raphtory/src/python/graph/graph.rs +++ b/raphtory/src/python/graph/graph.rs @@ -451,9 +451,11 @@ impl PyGraph { /// Returns: /// Graph: The loaded Graph object. #[staticmethod] - #[pyo3(signature = (edge_df, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, - edge_layer = None, layer_in_df = true, node_df = None, node_id = None, node_time = None, node_properties = None, - node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true))] + #[pyo3( + signature = (edge_df, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, + edge_layer = None, layer_in_df = true, node_df = None, node_id = None, node_time = None, node_properties = None, + node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true) + )] fn load_from_pandas( edge_df: &PyAny, edge_src: &str, @@ -473,33 +475,33 @@ impl PyGraph { node_type: Option<&str>, node_type_in_df: Option, ) -> Result { - let graph = PyGraph { - graph: Graph::new(), - }; + let graph = Graph::new(); if let (Some(node_df), Some(node_id), Some(node_time)) = (node_df, node_id, node_time) { - graph.load_nodes_from_pandas( + load_nodes_from_pandas( + &graph.core_graph(), node_df, node_id, node_time, node_type, node_type_in_df, - node_properties, - node_const_properties, - node_shared_const_properties, + node_properties.as_ref().map(|props| props.as_ref()), + node_const_properties.as_ref().map(|props| props.as_ref()), + node_shared_const_properties.as_ref(), )?; } - graph.load_edges_from_pandas( + load_edges_from_pandas( + &graph.core_graph(), edge_df, edge_src, edge_dst, edge_time, - edge_properties, - edge_const_properties, - edge_shared_const_properties, + edge_properties.as_ref().map(|props| props.as_ref()), + edge_const_properties.as_ref().map(|props| props.as_ref()), + edge_shared_const_properties.as_ref(), edge_layer, layer_in_df, )?; - Ok(graph.graph) + Ok(graph) } /// Load a graph from Parquet file. @@ -526,9 +528,11 @@ impl PyGraph { /// Returns: /// Graph: The loaded Graph object. #[staticmethod] - #[pyo3(signature = (edge_parquet_path, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, - edge_layer = None, layer_in_df = true, node_parquet_path = None, node_id = None, node_time = None, node_properties = None, - node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true))] + #[pyo3( + signature = (edge_parquet_path, edge_src, edge_dst, edge_time, edge_properties = None, edge_const_properties = None, edge_shared_const_properties = None, + edge_layer = None, layer_in_df = true, node_parquet_path = None, node_id = None, node_time = None, node_properties = None, + node_const_properties = None, node_shared_const_properties = None, node_type = None, node_type_in_df = true) + )] fn load_from_parquet( edge_parquet_path: PathBuf, edge_src: &str, @@ -548,35 +552,37 @@ impl PyGraph { node_type: Option<&str>, node_type_in_df: Option, ) -> Result { - let graph = PyGraph { - graph: Graph::new(), - }; + let graph = Graph::new(); + if let (Some(node_parquet_path), Some(node_id), Some(node_time)) = (node_parquet_path, node_id, node_time) { - graph.load_nodes_from_parquet( - node_parquet_path, + load_nodes_from_parquet( + &graph, + &node_parquet_path, node_id, node_time, node_type, node_type_in_df, - node_properties, - node_const_properties, - node_shared_const_properties, + node_properties.as_ref().map(|props| props.as_ref()), + node_const_properties.as_ref().map(|props| props.as_ref()), + node_shared_const_properties.as_ref(), )?; } - graph.load_edges_from_parquet( + load_edges_from_parquet( + &graph, edge_parquet_path, edge_src, edge_dst, edge_time, - edge_properties, - edge_const_properties, - edge_shared_const_properties, + edge_properties.as_ref().map(|props| props.as_ref()), + edge_const_properties.as_ref().map(|props| props.as_ref()), + edge_shared_const_properties.as_ref(), edge_layer, layer_in_df, )?; - Ok(graph.graph) + + Ok(graph) } /// Load nodes from a Pandas DataFrame into the graph. @@ -592,7 +598,9 @@ impl PyGraph { /// shared_const_properties (Dictionary/Hashmap of properties): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (df, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None))] + #[pyo3( + signature = (df, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None) + )] fn load_nodes_from_pandas( &self, df: &PyAny, @@ -611,9 +619,9 @@ impl PyGraph { time, node_type, node_type_in_df, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -630,7 +638,9 @@ impl PyGraph { /// shared_const_properties (Dictionary/Hashmap of properties): A dictionary of constant properties that will be added to every node. Defaults to None. (optional) /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (parquet_path, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None))] + #[pyo3( + signature = (parquet_path, id, time, node_type = None, node_type_in_df = true, properties = None, const_properties = None, shared_const_properties = None) + )] fn load_nodes_from_parquet( &self, parquet_path: PathBuf, @@ -649,9 +659,9 @@ impl PyGraph { time, node_type, node_type_in_df, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -670,7 +680,9 @@ impl PyGraph { /// /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (df, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + #[pyo3( + signature = (df, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true) + )] fn load_edges_from_pandas( &self, df: &PyAny, @@ -689,9 +701,9 @@ impl PyGraph { src, dst, time, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -712,7 +724,9 @@ impl PyGraph { /// /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (parquet_path, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + #[pyo3( + signature = (parquet_path, src, dst, time, properties = None, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true) + )] fn load_edges_from_parquet( &self, parquet_path: PathBuf, @@ -731,9 +745,9 @@ impl PyGraph { src, dst, time, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -761,8 +775,8 @@ impl PyGraph { self.graph.core_graph(), df, id, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -788,8 +802,8 @@ impl PyGraph { &self.graph, parquet_path.as_path(), id, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -806,7 +820,9 @@ impl PyGraph { /// /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (df, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + #[pyo3( + signature = (df, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true) + )] fn load_edge_props_from_pandas( &self, df: &PyAny, @@ -822,8 +838,8 @@ impl PyGraph { df, src, dst, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -842,7 +858,9 @@ impl PyGraph { /// /// Returns: /// Result<(), GraphError>: Result of the operation. - #[pyo3(signature = (parquet_path, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true))] + #[pyo3( + signature = (parquet_path, src, dst, const_properties = None, shared_const_properties = None, layer = None, layer_in_df = true) + )] fn load_edge_props_from_parquet( &self, parquet_path: PathBuf, @@ -858,8 +876,8 @@ impl PyGraph { parquet_path.as_path(), src, dst, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) diff --git a/raphtory/src/python/graph/graph_with_deletions.rs b/raphtory/src/python/graph/graph_with_deletions.rs index 4ad7f2842..59ddd4094 100644 --- a/raphtory/src/python/graph/graph_with_deletions.rs +++ b/raphtory/src/python/graph/graph_with_deletions.rs @@ -562,9 +562,9 @@ impl PyPersistentGraph { time, node_type, node_type_in_df, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -600,9 +600,9 @@ impl PyPersistentGraph { time, node_type, node_type_in_df, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -640,9 +640,9 @@ impl PyPersistentGraph { src, dst, time, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -682,9 +682,9 @@ impl PyPersistentGraph { src, dst, time, - properties, - const_properties, - shared_const_properties, + properties.as_ref().map(|props| props.as_ref()), + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -770,8 +770,8 @@ impl PyPersistentGraph { &self.graph.0, df, id, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -797,8 +797,8 @@ impl PyPersistentGraph { &self.graph, parquet_path.as_path(), id, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), ) } @@ -831,8 +831,8 @@ impl PyPersistentGraph { df, src, dst, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) @@ -867,8 +867,8 @@ impl PyPersistentGraph { parquet_path.as_path(), src, dst, - const_properties, - shared_const_properties, + const_properties.as_ref().map(|props| props.as_ref()), + shared_const_properties.as_ref(), layer, layer_in_df, ) diff --git a/raphtory/src/python/graph/io/pandas_loaders.rs b/raphtory/src/python/graph/io/pandas_loaders.rs index b211de46a..be0317abc 100644 --- a/raphtory/src/python/graph/io/pandas_loaders.rs +++ b/raphtory/src/python/graph/io/pandas_loaders.rs @@ -5,7 +5,11 @@ use crate::{ python::graph::io::*, }; use polars_arrow::{array::Array, ffi}; -use pyo3::{ffi::Py_uintptr_t, prelude::*, types::IntoPyDict}; +use pyo3::{ + ffi::Py_uintptr_t, + prelude::*, + types::{IntoPyDict, PyDict}, +}; use std::collections::HashMap; pub fn load_nodes_from_pandas( @@ -15,34 +19,24 @@ pub fn load_nodes_from_pandas( time: &str, node_type: Option<&str>, node_type_in_df: Option, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, ) -> Result<(), GraphError> { Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; - let mut cols_to_check = vec![id, time]; - cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(properties.unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); if node_type_in_df.unwrap_or(true) { if let Some(ref node_type) = node_type { cols_to_check.push(node_type.as_ref()); } } - let df = process_pandas_py_df(df, py, cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - + let df_view = process_pandas_py_df(df, py, cols_to_check.clone())?; + df_view.check_cols_exist(&cols_to_check)?; load_nodes_from_df( - &df, - size, + df_view, id, time, properties, @@ -65,36 +59,26 @@ pub fn load_edges_from_pandas( src: &str, dst: &str, time: &str, - properties: Option>, - const_properties: Option>, - shared_const_properties: Option>, + properties: Option<&[&str]>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; - let mut cols_to_check = vec![src, dst, time]; - cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); + cols_to_check.extend(properties.unwrap_or(&Vec::new())); + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); if layer_in_df.unwrap_or(false) { if let Some(ref layer) = layer { cols_to_check.push(layer.as_ref()); } } - let df = process_pandas_py_df(df, py, cols_to_check.clone())?; - - df.check_cols_exist(&cols_to_check)?; + let df_view = process_pandas_py_df(df, py, cols_to_check.clone())?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_from_df( - &df, - size, + df_view, src, dst, time, @@ -106,7 +90,6 @@ pub fn load_edges_from_pandas( graph, ) .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; - Ok::<(), PyErr>(()) }) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; @@ -117,32 +100,22 @@ pub fn load_node_props_from_pandas( graph: &GraphStorage, df: &PyAny, id: &str, - const_properties: Option>, - shared_const_properties: Option>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, ) -> Result<(), GraphError> { Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; let mut cols_to_check = vec![id]; - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); - let df = process_pandas_py_df(df, py, cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); + let df_view = process_pandas_py_df(df, py, cols_to_check.clone())?; + df_view.check_cols_exist(&cols_to_check)?; load_node_props_from_df( - &df, - size, + df_view, id, const_properties, shared_const_properties, graph, ) .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; - Ok::<(), PyErr>(()) }) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; @@ -154,31 +127,23 @@ pub fn load_edge_props_from_pandas( df: &PyAny, src: &str, dst: &str, - const_properties: Option>, - shared_const_properties: Option>, + const_properties: Option<&[&str]>, + shared_const_properties: Option<&HashMap>, layer: Option<&str>, layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; let mut cols_to_check = vec![src, dst]; if layer_in_df.unwrap_or(false) { if let Some(ref layer) = layer { cols_to_check.push(layer.as_ref()); } } - cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); - let df = process_pandas_py_df(df, py, cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; + cols_to_check.extend(const_properties.unwrap_or(&Vec::new())); + let df_view = process_pandas_py_df(df, py, cols_to_check.clone())?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_props_from_df( - &df, - size, + df_view, src, dst, const_properties, @@ -188,7 +153,6 @@ pub fn load_edge_props_from_pandas( graph, ) .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; - df.check_cols_exist(&cols_to_check)?; Ok::<(), PyErr>(()) }) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; @@ -205,14 +169,6 @@ pub fn load_edges_deletions_from_pandas( layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { - let size: usize = py - .eval( - "index.__len__()", - Some([("index", df.getattr("index")?)].into_py_dict(py)), - None, - )? - .extract()?; - let mut cols_to_check = vec![src, dst, time]; if layer_in_df.unwrap_or(true) { if let Some(ref layer) = layer { @@ -220,12 +176,10 @@ pub fn load_edges_deletions_from_pandas( } } - let df = process_pandas_py_df(df, py, cols_to_check.clone())?; - df.check_cols_exist(&cols_to_check)?; - + let df_view = process_pandas_py_df(df, py, cols_to_check.clone())?; + df_view.check_cols_exist(&cols_to_check)?; load_edges_deletions_from_df( - &df, - size, + df_view, src, dst, time, @@ -234,18 +188,17 @@ pub fn load_edges_deletions_from_pandas( graph.core_graph(), ) .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; - Ok::<(), PyErr>(()) }) .map_err(|e| GraphError::LoadFailure(format!("Failed to load graph {e:?}")))?; Ok(()) } -pub(crate) fn process_pandas_py_df( - df: &PyAny, - py: Python, +pub(crate) fn process_pandas_py_df<'a>( + df: &'a PyAny, + py: Python<'a>, col_names: Vec<&str>, -) -> PyResult { +) -> PyResult> + 'a>> { is_jupyter(py); py.import("pandas")?; let module = py.import("pyarrow")?; @@ -268,8 +221,11 @@ pub(crate) fn process_pandas_py_df( let _df_columns: Vec = dropped_df.getattr("columns")?.extract()?; let table = pa_table.call_method("from_pandas", (dropped_df,), None)?; - - let rb = table.call_method0("to_batches")?.extract::>()?; + let kwargs = PyDict::new(py); + kwargs.set_item("max_chunksize", 100000)?; + let rb = table + .call_method("to_batches", (), Some(kwargs))? + .extract::>()?; let names: Vec = if let Some(batch0) = rb.get(0) { let schema = batch0.getattr("schema")?; schema.getattr("names")?.extract::>()? @@ -280,21 +236,33 @@ pub(crate) fn process_pandas_py_df( .filter(|x| col_names.contains(&x.as_str())) .collect(); - let arrays = rb - .iter() - .map(|rb| { - (0..names.len()) - .map(|i| { - let array = rb.call_method1("column", (i,))?; - let arr = array_to_rust(array)?; - Ok::, PyErr>(arr) - }) - .collect::, PyErr>>() - }) - .collect::, PyErr>>()?; - - let df = DFView { names, arrays }; - Ok(df) + let names_len = names.len(); + let chunks = rb.into_iter().map(move |rb| { + let chunk = (0..names_len) + .map(|i| { + let array = rb + .call_method1("column", (i,)) + .map_err(|e| GraphError::from(e))?; + let arr = array_to_rust(array).map_err(|e| GraphError::from(e))?; + Ok::, GraphError>(arr) + }) + .collect::, GraphError>>()?; + + Ok(DFChunk { chunk }) + }); + let num_rows: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; + + Ok(DFView { + names, + chunks, + num_rows, + }) } pub fn array_to_rust(obj: &PyAny) -> PyResult {