From 7f086c0bc53cdffeba565ad51a87e3b47eca9e4a Mon Sep 17 00:00:00 2001 From: Fabian Murariu Date: Fri, 26 Jul 2024 16:36:29 +0100 Subject: [PATCH] put back the iterator loading bars when calling from python --- Cargo.lock | 58 +--- Cargo.toml | 4 +- raphtory/Cargo.toml | 2 +- .../community_detection/label_propagation.rs | 1 - .../components/connected_components.rs | 2 - .../entities/graph/logical_to_physical.rs | 8 - raphtory/src/db/api/view/internal/core_ops.rs | 1 + raphtory/src/db/api/view/serialise.rs | 3 +- raphtory/src/db/mod.rs | 1 - raphtory/src/io/arrow/dataframe.rs | 7 + raphtory/src/io/arrow/df_loaders.rs | 282 ++++++++++++++++-- raphtory/src/io/parquet_loaders.rs | 10 + .../src/python/graph/io/pandas_loaders.rs | 53 +++- 13 files changed, 326 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4758ffb3ff..b9e702add6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4026,37 +4026,6 @@ dependencies = [ name = "pometry-storage" version = "0.10.0" -[[package]] -name = "pometry-storage-private" -version = "0.8.1" -dependencies = [ - "ahash", - "bincode", - "bytemuck", - "criterion", - "itertools 0.12.1", - "memmap2", - "num-traits", - "once_cell", - "parking_lot", - "polars-arrow", - "polars-parquet", - "polars-utils", - "proptest", - "rand 0.8.5", - "raphtory-api", - "rayon", - "serde", - "serde_json", - "strum", - "tempfile", - "thiserror", - "tracing", - "tracing-subscriber", - "tracing-test", - "twox-hash", -] - [[package]] name = "portable-atomic" version = "1.6.0" @@ -4502,7 +4471,7 @@ dependencies = [ "polars-arrow", "polars-parquet", "polars-utils", - "pometry-storage-private", + "pometry-storage", "pretty_assertions", "proptest", "prost", @@ -4564,7 +4533,7 @@ dependencies = [ "csv", "flate2", "polars-arrow", - "pometry-storage-private", + "pometry-storage", "rand 0.8.5", "raphtory", "raphtory-api", @@ -4592,7 +4561,7 @@ dependencies = [ "pest", "pest_derive", "polars-arrow", - "pometry-storage-private", + "pometry-storage", "pretty_assertions", "proptest", "rand 0.8.5", @@ -6080,27 +6049,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "tracing-test" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" -dependencies = [ - "tracing-core", - "tracing-subscriber", - "tracing-test-macro", -] - -[[package]] -name = "tracing-test-macro" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" -dependencies = [ - "quote", - "syn 2.0.66", -] - [[package]] name = "triomphe" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index a795633d66..b9389b614f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,9 +34,9 @@ debug = true [workspace.dependencies] #[public-storage] -# pometry-storage = { version = ">=0.8.1", path = "pometry-storage" } +pometry-storage = { version = ">=0.8.1", path = "pometry-storage" } #[private-storage] -pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" } +# pometry-storage = { path = "pometry-storage-private", package = "pometry-storage-private" } async-graphql = { version = "7.0.5", features = ["dynamic-schema"] } async-graphql-poem = "7.0.5" dynamic-graphql = "0.9.0" diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 22de6e7040..8a2b23efdb 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -125,6 +125,7 @@ python = [ "polars-arrow?/compute", "raphtory-api/python", "dep:rpds", + "dep:kdam", "kdam?/notebook" ] @@ -146,7 +147,6 @@ arrow = [ "dep:polars-arrow", "dep:polars-parquet", "polars-parquet?/compression", - "dep:kdam", ] proto = [ diff --git a/raphtory/src/algorithms/community_detection/label_propagation.rs b/raphtory/src/algorithms/community_detection/label_propagation.rs index 730e7938bf..5abae0ed6c 100644 --- a/raphtory/src/algorithms/community_detection/label_propagation.rs +++ b/raphtory/src/algorithms/community_detection/label_propagation.rs @@ -107,7 +107,6 @@ mod lpa_tests { .iter() .map(|n_set| n_set.iter().map(|n| n.node).collect::>()) .collect::>(); - println!("{:?}", ids); let expected = vec![ HashSet::from([ diff --git a/raphtory/src/algorithms/components/connected_components.rs b/raphtory/src/algorithms/components/connected_components.rs index f9c89e49be..86cd41182c 100644 --- a/raphtory/src/algorithms/components/connected_components.rs +++ b/raphtory/src/algorithms/components/connected_components.rs @@ -279,8 +279,6 @@ mod cc_test { let vs = vs.into_iter().unique().collect::>(); - // let smallest = vs.iter().min().unwrap(); - let first = vs[0]; // pairs of nodes from vs one after the next diff --git a/raphtory/src/core/entities/graph/logical_to_physical.rs b/raphtory/src/core/entities/graph/logical_to_physical.rs index 25150ec246..374fe98c86 100644 --- a/raphtory/src/core/entities/graph/logical_to_physical.rs +++ b/raphtory/src/core/entities/graph/logical_to_physical.rs @@ -11,7 +11,6 @@ use crate::core::utils::errors::{GraphError, MutateGraphError}; #[derive(Debug, Deserialize, Serialize)] enum Map { U64(FxDashMap), - I64(FxDashMap), Str(FxDashMap), } @@ -29,13 +28,6 @@ impl Map { _ => None, } } - - fn as_i64(&self) -> Option<&FxDashMap> { - match self { - Map::I64(map) => Some(map), - _ => None, - } - } } impl Default for Map { diff --git a/raphtory/src/db/api/view/internal/core_ops.rs b/raphtory/src/db/api/view/internal/core_ops.rs index ccf0b6526c..0cc13c88fc 100644 --- a/raphtory/src/db/api/view/internal/core_ops.rs +++ b/raphtory/src/db/api/view/internal/core_ops.rs @@ -154,6 +154,7 @@ pub trait CoreGraphOps { } /// Returns the type of node + #[inline] fn node_type(&self, v: VID) -> Option { let type_id = self.node_type_id(v); self.node_meta().get_node_type_name_by_id(type_id) diff --git a/raphtory/src/db/api/view/serialise.rs b/raphtory/src/db/api/view/serialise.rs index 0c331ef4ef..bdf859527c 100644 --- a/raphtory/src/db/api/view/serialise.rs +++ b/raphtory/src/db/api/view/serialise.rs @@ -200,10 +200,9 @@ impl<'graph, G: GraphViewOps<'graph>> StableEncoder for G { graph.nodes = self .nodes() .into_iter() - .map(|n: crate::db::graph::node::NodeView| { + .map(|n| { let gid = n.id(); let vid = n.node; - let node = self.core_node_entry(vid); let proto_gid = match gid { GID::U64(g) => Gid { gid: Some(gid::Gid::GidU64(g)), diff --git a/raphtory/src/db/mod.rs b/raphtory/src/db/mod.rs index 4b38207a69..22537f5b97 100644 --- a/raphtory/src/db/mod.rs +++ b/raphtory/src/db/mod.rs @@ -1,5 +1,4 @@ pub mod api; pub mod graph; -// pub(crate) mod internal; pub mod task; pub mod utils; diff --git a/raphtory/src/io/arrow/dataframe.rs b/raphtory/src/io/arrow/dataframe.rs index 328edf2006..2b8173f5e1 100644 --- a/raphtory/src/io/arrow/dataframe.rs +++ b/raphtory/src/io/arrow/dataframe.rs @@ -17,6 +17,13 @@ pub(crate) struct DFView { } impl DFView { + pub(crate) fn get_inner_size(&self) -> usize { + if self.arrays.is_empty() || self.arrays[0].is_empty() { + return 0; + } + self.arrays[0][0].len() + } + pub fn check_cols_exist(&self, cols: &[&str]) -> Result<(), GraphError> { let non_cols: Vec<&&str> = cols .iter() diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 04b84ecb2f..beec0ddbd1 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -2,12 +2,13 @@ use crate::{ core::utils::errors::GraphError, db::api::{ mutation::{internal::*, AdditionOps}, - storage::storage_ops::GraphStorage, view::StaticGraphViewOps, }, io::arrow::{dataframe::DFView, prop_handler::*}, prelude::*, }; +#[cfg(feature = "python")] +use kdam::tqdm; use std::{collections::HashMap, iter}; pub(crate) fn load_nodes_from_df< @@ -15,6 +16,7 @@ pub(crate) fn load_nodes_from_df< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( df: &'a DFView, + size: usize, node_id: &str, time: &str, properties: Option>, @@ -55,6 +57,7 @@ pub(crate) fn load_nodes_from_df< .map(|((node_id, time), n_t)| (node_id, time, n_t)); load_nodes_from_num_iter( graph, + size, iter, prop_iter, const_prop_iter, @@ -70,6 +73,7 @@ pub(crate) fn load_nodes_from_df< load_nodes_from_num_iter( graph, + size, iter, prop_iter, const_prop_iter, @@ -81,8 +85,19 @@ pub(crate) fn load_nodes_from_df< .zip(node_type) .map(|((node_id, time), n_t)| (node_id, time, n_t)); - for (((node_id, time, n_t), props), const_props) in iter.zip(prop_iter).zip(const_prop_iter) - { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(prop_iter).zip(const_prop_iter), + desc = "Loading nodes", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = iter.zip(prop_iter).zip(const_prop_iter); + + for (((node_id, time, n_t), props), const_props) in iter { if let (Some(node_id), Some(time), n_t) = (node_id, time, n_t) { let actual_type = extract_out_default_type(n_t); let v = graph.add_node(time, node_id, props, actual_type)?; @@ -98,8 +113,19 @@ pub(crate) fn load_nodes_from_df< .zip(node_type) .map(|((node_id, time), n_t)| (node_id, time, n_t)); - for (((node_id, time, n_t), props), const_props) in iter.zip(prop_iter).zip(const_prop_iter) - { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(prop_iter).zip(const_prop_iter), + desc = "Loading nodes", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = iter.zip(prop_iter).zip(const_prop_iter); + + for (((node_id, time, n_t), props), const_props) in iter { let actual_type = extract_out_default_type(n_t); if let (Some(node_id), Some(time), n_t) = (node_id, time, actual_type) { let v = graph.add_node(time, node_id, props, n_t)?; @@ -132,6 +158,7 @@ pub(crate) fn load_edges_from_df< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( df: &'a DFView, + size: usize, src: &str, dst: &str, time: &str, @@ -156,6 +183,7 @@ pub(crate) fn load_edges_from_df< .zip(time); load_edges_from_num_iter( graph, + size, triplets, prop_iter, const_prop_iter, @@ -173,6 +201,7 @@ pub(crate) fn load_edges_from_df< .zip(time); load_edges_from_num_iter( graph, + size, triplets, prop_iter, const_prop_iter, @@ -186,9 +215,19 @@ pub(crate) fn load_edges_from_df< ) { let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - for (((((src, dst), time), props), const_props), layer) in - triplets.zip(prop_iter).zip(const_prop_iter).zip(layer) - { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(prop_iter).zip(const_prop_iter).zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(prop_iter).zip(const_prop_iter).zip(layer); + + for (((((src, dst), time), props), const_props), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; e.add_constant_properties(const_props, layer.as_deref())?; @@ -203,9 +242,19 @@ pub(crate) fn load_edges_from_df< df.time_iter_col(time), ) { let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - for (((((src, dst), time), props), const_props), layer) in - triplets.zip(prop_iter).zip(const_prop_iter).zip(layer) - { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(prop_iter).zip(const_prop_iter).zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(prop_iter).zip(const_prop_iter).zip(layer); + + for (((((src, dst), time), props), const_props), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { let e = graph.add_edge(time, src, dst, props, layer.as_deref())?; e.add_constant_properties(const_props, layer.as_deref())?; @@ -229,6 +278,7 @@ pub(crate) fn load_edges_deletions_from_df< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps + DeletionOps, >( df: &'a DFView, + size: usize, src: &str, dst: &str, time: &str, @@ -247,7 +297,20 @@ pub(crate) fn load_edges_deletions_from_df< .map(|i| i.copied()) .zip(dst.map(|i| i.copied())) .zip(time); - for (((src, dst), time), layer) in triplets.zip(layer) { + + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(layer); + + for (((src, dst), time), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { graph.delete_edge(time, src, dst, layer.as_deref())?; } @@ -261,7 +324,20 @@ pub(crate) fn load_edges_deletions_from_df< .map(i64_opt_into_u64_opt) .zip(dst.map(i64_opt_into_u64_opt)) .zip(time); - for (((src, dst), time), layer) in triplets.zip(layer) { + + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(layer); + + for (((src, dst), time), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { graph.delete_edge(time, src, dst, layer.as_deref())?; } @@ -272,7 +348,18 @@ pub(crate) fn load_edges_deletions_from_df< df.time_iter_col(time), ) { let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - for (((src, dst), time), layer) in triplets.zip(layer) { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = triplets.zip(layer); + + for (((src, dst), time), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { graph.delete_edge(time, src, dst, layer.as_deref())?; } @@ -283,7 +370,18 @@ pub(crate) fn load_edges_deletions_from_df< df.time_iter_col(time), ) { let triplets = src.into_iter().zip(dst.into_iter()).zip(time.into_iter()); - for (((src, dst), time), layer) in triplets.zip(layer) { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = triplets.zip(layer); + + for (((src, dst), time), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { graph.delete_edge(time, src, dst, layer.as_deref())?; } @@ -302,6 +400,7 @@ pub(crate) fn load_node_props_from_df< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( df: &'a DFView, + size: usize, node_id: &str, const_properties: Option>, shared_const_properties: Option>, @@ -311,7 +410,18 @@ pub(crate) fn load_node_props_from_df< if let Some(node_id) = df.iter_col::(node_id) { let iter = node_id.map(|i| i.copied()); - for (node_id, const_props) in iter.zip(const_prop_iter) { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(const_prop_iter), + desc = "Loading node properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = iter.zip(const_prop_iter); + + for (node_id, const_props) in iter { if let Some(node_id) = node_id { let v = graph .node(node_id) @@ -324,7 +434,18 @@ pub(crate) fn load_node_props_from_df< } } else if let Some(node_id) = df.iter_col::(node_id) { let iter = node_id.map(i64_opt_into_u64_opt); - for (node_id, const_props) in iter.zip(const_prop_iter) { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(const_prop_iter), + desc = "Loading node properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = iter.zip(const_prop_iter); + + for (node_id, const_props) in iter { if let Some(node_id) = node_id { let v = graph .node(node_id) @@ -337,7 +458,18 @@ pub(crate) fn load_node_props_from_df< } } else if let Some(node_id) = df.utf8::(node_id) { let iter = node_id.into_iter(); - for (node_id, const_props) in iter.zip(const_prop_iter) { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(const_prop_iter), + desc = "Loading node properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = iter.zip(const_prop_iter); + + for (node_id, const_props) in iter { if let Some(node_id) = node_id { let v = graph .node(node_id) @@ -350,7 +482,18 @@ pub(crate) fn load_node_props_from_df< } } else if let Some(node_id) = df.utf8::(node_id) { let iter = node_id.into_iter(); - for (node_id, const_props) in iter.zip(const_prop_iter) { + #[cfg(feature = "python")] + let iter = tqdm!( + iter.zip(const_prop_iter), + desc = "Loading node properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + #[cfg(not(feature = "python"))] + let iter = iter.zip(const_prop_iter); + + for (node_id, const_props) in iter { if let Some(node_id) = node_id { let v = graph .node(node_id) @@ -369,24 +512,40 @@ pub(crate) fn load_node_props_from_df< Ok(()) } -pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( +pub(crate) fn load_edges_props_from_df< + 'a, + S: AsRef, + G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, +>( df: &'a DFView, + size: usize, src: &str, dst: &str, const_properties: Option>, shared_const_properties: Option>, layer: Option, layer_in_df: bool, - graph: &GraphStorage, + graph: &G, ) -> Result<(), GraphError> { - let graph = Graph::from_internal_graph(graph); let (_, const_prop_iter) = get_prop_rows(df, None, const_properties)?; let layer = lift_layer(layer, layer_in_df, df); if let (Some(src), Some(dst)) = (df.iter_col::(src), df.iter_col::(dst)) { let triplets = src.map(|i| i.copied()).zip(dst.map(|i| i.copied())); - for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(const_prop_iter).zip(layer), + desc = "Loading edge properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(const_prop_iter).zip(layer); + + for (((src, dst), const_props), layer) in iter { if let (Some(src), Some(dst)) = (src, dst) { let e = graph .edge(src, dst) @@ -401,7 +560,20 @@ pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( let triplets = src .map(i64_opt_into_u64_opt) .zip(dst.map(i64_opt_into_u64_opt)); - for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(const_prop_iter).zip(layer), + desc = "Loading edge properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(const_prop_iter).zip(layer); + + for (((src, dst), const_props), layer) in iter { if let (Some(src), Some(dst)) = (src, dst) { let e = graph .edge(src, dst) @@ -414,7 +586,19 @@ pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( } } else if let (Some(src), Some(dst)) = (df.utf8::(src), df.utf8::(dst)) { let triplets = src.into_iter().zip(dst.into_iter()); - for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(const_prop_iter).zip(layer), + desc = "Loading edge properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(const_prop_iter).zip(layer); + + for (((src, dst), const_props), layer) in iter { if let (Some(src), Some(dst)) = (src, dst) { let e = graph .edge(src, dst) @@ -430,7 +614,20 @@ pub(crate) fn load_edges_props_from_df<'a, S: AsRef>( } } else if let (Some(src), Some(dst)) = (df.utf8::(src), df.utf8::(dst)) { let triplets = src.into_iter().zip(dst.into_iter()); - for (((src, dst), const_props), layer) in triplets.zip(const_prop_iter).zip(layer) { + + #[cfg(feature = "python")] + let iter = tqdm!( + triplets.zip(const_prop_iter).zip(layer), + desc = "Loading edge properties", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = triplets.zip(const_prop_iter).zip(layer); + + for (((src, dst), const_props), layer) in iter { if let (Some(src), Some(dst)) = (src, dst) { let e = graph .edge(src, dst) @@ -466,15 +663,26 @@ fn load_edges_from_num_iter< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( graph: &G, + size: usize, edges: I, properties: PI, const_properties: PI, shared_const_properties: Option>, layer: IL, ) -> Result<(), GraphError> { - for (((((src, dst), time), edge_props), const_props), layer) in - edges.zip(properties).zip(const_properties).zip(layer) - { + #[cfg(feature = "python")] + let iter = tqdm!( + edges.zip(properties).zip(const_properties).zip(layer), + desc = "Loading edges", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = edges.zip(properties).zip(const_properties).zip(layer); + + for (((((src, dst), time), edge_props), const_props), layer) in iter { if let (Some(src), Some(dst), Some(time)) = (src, dst, time) { let e = graph.add_edge(time, src, dst, edge_props, layer.as_deref())?; e.add_constant_properties(const_props, layer.as_deref())?; @@ -494,14 +702,24 @@ fn load_nodes_from_num_iter< G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps, >( graph: &G, + size: usize, nodes: I, properties: PI, const_properties: PI, shared_const_properties: Option>, ) -> Result<(), GraphError> { - for (((node, time, node_type), props), const_props) in - nodes.zip(properties).zip(const_properties) - { + #[cfg(feature = "python")] + let iter = tqdm!( + nodes.zip(properties).zip(const_properties), + desc = "Loading nodes", + total = size, + animation = kdam::Animation::FillUp, + unit_scale = true + ); + + #[cfg(not(feature = "python"))] + let iter = nodes.zip(properties).zip(const_properties); + for (((node, time, node_type), props), const_props) in iter { if let (Some(v), Some(t), n_t, props, const_props) = (node, time, node_type, props, const_props) { diff --git a/raphtory/src/io/parquet_loaders.rs b/raphtory/src/io/parquet_loaders.rs index 7df003705f..eeb4084b9c 100644 --- a/raphtory/src/io/parquet_loaders.rs +++ b/raphtory/src/io/parquet_loaders.rs @@ -49,8 +49,10 @@ pub fn load_nodes_from_parquet< for path in get_parquet_file_paths(parquet_path)? { let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; + let size = df.get_inner_size(); load_nodes_from_df( &df, + size, id, time, properties.clone(), @@ -92,8 +94,10 @@ pub fn load_edges_from_parquet< for path in get_parquet_file_paths(parquet_path)? { let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); load_edges_from_df( &df, + size, src, dst, time, @@ -125,8 +129,10 @@ pub fn load_node_props_from_parquet< for path in get_parquet_file_paths(parquet_path)? { let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); load_node_props_from_df( &df, + size, id, const_properties.clone(), shared_const_properties.clone(), @@ -161,8 +167,10 @@ pub fn load_edge_props_from_parquet< for path in get_parquet_file_paths(parquet_path)? { let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); load_edges_props_from_df( &df, + size, src, dst, const_properties.clone(), @@ -198,8 +206,10 @@ pub fn load_edges_deletions_from_parquet< for path in get_parquet_file_paths(parquet_path)? { let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; + let size = cols_to_check.len(); load_edges_deletions_from_df( &df, + size, src, dst, time, diff --git a/raphtory/src/python/graph/io/pandas_loaders.rs b/raphtory/src/python/graph/io/pandas_loaders.rs index a22f50ad91..b211de46a0 100644 --- a/raphtory/src/python/graph/io/pandas_loaders.rs +++ b/raphtory/src/python/graph/io/pandas_loaders.rs @@ -20,6 +20,14 @@ pub fn load_nodes_from_pandas( shared_const_properties: Option>, ) -> Result<(), GraphError> { Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; + let mut cols_to_check = vec![id, time]; cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); @@ -34,6 +42,7 @@ pub fn load_nodes_from_pandas( load_nodes_from_df( &df, + size, id, time, properties, @@ -63,6 +72,14 @@ pub fn load_edges_from_pandas( layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; + let mut cols_to_check = vec![src, dst, time]; cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new())); cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); @@ -77,6 +94,7 @@ pub fn load_edges_from_pandas( df.check_cols_exist(&cols_to_check)?; load_edges_from_df( &df, + size, src, dst, time, @@ -103,13 +121,27 @@ pub fn load_node_props_from_pandas( shared_const_properties: Option>, ) -> Result<(), GraphError> { Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; let mut cols_to_check = vec![id]; cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new())); let df = process_pandas_py_df(df, py, cols_to_check.clone())?; df.check_cols_exist(&cols_to_check)?; - load_node_props_from_df(&df, id, const_properties, shared_const_properties, graph) - .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; + load_node_props_from_df( + &df, + size, + id, + const_properties, + shared_const_properties, + graph, + ) + .map_err(|e| GraphLoadException::new_err(format!("{:?}", e)))?; Ok::<(), PyErr>(()) }) @@ -128,6 +160,13 @@ pub fn load_edge_props_from_pandas( layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; let mut cols_to_check = vec![src, dst]; if layer_in_df.unwrap_or(false) { if let Some(ref layer) = layer { @@ -139,6 +178,7 @@ pub fn load_edge_props_from_pandas( df.check_cols_exist(&cols_to_check)?; load_edges_props_from_df( &df, + size, src, dst, const_properties, @@ -165,6 +205,14 @@ pub fn load_edges_deletions_from_pandas( layer_in_df: Option, ) -> Result<(), GraphError> { Python::with_gil(|py| { + let size: usize = py + .eval( + "index.__len__()", + Some([("index", df.getattr("index")?)].into_py_dict(py)), + None, + )? + .extract()?; + let mut cols_to_check = vec![src, dst, time]; if layer_in_df.unwrap_or(true) { if let Some(ref layer) = layer { @@ -177,6 +225,7 @@ pub fn load_edges_deletions_from_pandas( load_edges_deletions_from_df( &df, + size, src, dst, time,