Skip to content

Commit

Permalink
fix issue with valid_layers
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamka1 committed Jun 26, 2024
1 parent 05a586a commit 07d12f3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 37 deletions.
26 changes: 17 additions & 9 deletions raphtory/src/core/entities/graph/tgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ impl Default for InternalGraph {
}

impl TemporalGraph {
fn get_valid_layers(edge_meta: &Arc<Meta>) -> Vec<String> {
edge_meta
.layer_meta()
.get_keys()
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
}

pub(crate) fn num_layers(&self) -> usize {
self.edge_meta.layer_meta().len()
}
Expand All @@ -139,27 +148,26 @@ impl TemporalGraph {
}

pub(crate) fn layer_ids(&self, key: Layer) -> Result<LayerIds, GraphError> {
let valid_layers = self
.edge_meta
.layer_meta()
.get_keys()
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>();
match key {
Layer::None => Ok(LayerIds::None),
Layer::All => Ok(LayerIds::All),
Layer::Default => Ok(LayerIds::One(0)),
Layer::One(id) => match self.edge_meta.get_layer_id(&id) {
Some(id) => Ok(LayerIds::One(id)),
None => Err(GraphError::invalid_layer(id.to_string(), valid_layers)),
None => Err(GraphError::invalid_layer(
id.to_string(),
Self::get_valid_layers(&self.edge_meta),
)),
},
Layer::Multiple(ids) => {
let mut new_layers = ids
.iter()
.map(|id| {
self.edge_meta.get_layer_id(id).ok_or_else(|| {
GraphError::invalid_layer(id.to_string(), valid_layers.clone())
GraphError::invalid_layer(
id.to_string(),
Self::get_valid_layers(&self.edge_meta),
)
})
})
.collect::<Result<Vec<_>, GraphError>>()?;
Expand Down
2 changes: 2 additions & 0 deletions raphtory/src/core/utils/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum GraphError {
#[cfg(feature = "arrow")]
#[error("Arrow error: {0}")]
Arrow(#[from] error::PolarsError),
#[error("Invalid path = {0}")]
InvalidPath(String),
#[error("Graph error occurred")]
UnsupportedDataType,
#[error("Graph already exists by name = {name}")]
Expand Down
25 changes: 17 additions & 8 deletions raphtory/src/db/graph/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ impl<'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>> BaseEdgeViewOps<
type BaseGraph = G;
type Graph = GH;

type ValueType<T> =T where T: 'graph;
type ValueType<T> = T
where
T: 'graph;
type PropType = Self;
type Nodes = NodeView<G, G>;
type Exploded = Edges<'graph, G, GH>;
Expand Down Expand Up @@ -162,26 +164,33 @@ impl<'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'graph>> BaseEdgeViewOps<
}

impl<G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps> EdgeView<G, G> {
fn get_valid_layers(graph: &G) -> Vec<String> {
graph.unique_layers().map(|l| l.0.to_string()).collect()
}

fn resolve_layer(&self, layer: Option<&str>, create: bool) -> Result<usize, GraphError> {
let valid_layers = self
.graph
.unique_layers()
.map(|l| l.0.to_string())
.collect::<Vec<_>>();
match layer {
Some(name) => match self.edge.layer() {
Some(l_id) => self
.graph
.get_layer_id(name)
.filter(|id| id == l_id)
.ok_or_else(|| GraphError::invalid_layer(name.to_owned(), valid_layers)),
.ok_or_else(|| {
GraphError::invalid_layer(
name.to_owned(),
Self::get_valid_layers(&self.graph),
)
}),
None => {
if create {
Ok(self.graph.resolve_layer(layer))
} else {
self.graph
.get_layer_id(name)
.ok_or(GraphError::invalid_layer(name.to_owned(), valid_layers))
.ok_or(GraphError::invalid_layer(
name.to_owned(),
Self::get_valid_layers(&self.graph),
))
}
}
},
Expand Down
29 changes: 17 additions & 12 deletions raphtory/src/disk_graph/graph_impl/layer_ops.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use super::DiskGraph;
use crate::{
core::{entities::LayerIds, utils::errors::GraphError},
db::api::view::internal::InternalLayerOps,
prelude::Layer,
};
use itertools::Itertools;
use pometry_storage::graph::TemporalGraph;
use std::sync::Arc;

use super::DiskGraph;
fn get_valid_layers(graph: &Arc<TemporalGraph>) -> Vec<String> {
graph
.layer_names()
.into_iter()
.map(|x| x.clone())
.collect_vec()
}

impl InternalLayerOps for DiskGraph {
fn layer_ids(&self) -> &LayerIds {
Expand All @@ -17,20 +26,13 @@ impl InternalLayerOps for DiskGraph {
}

fn layer_ids_from_names(&self, key: Layer) -> Result<LayerIds, GraphError> {
let valid_layers = self
.inner
.layer_names()
.into_iter()
.map(|x| x.clone())
.collect_vec();
match key {
Layer::All => Ok(LayerIds::All),
Layer::Default => Ok(LayerIds::One(0)),
Layer::One(name) => {
let id = self
.inner
.find_layer_id(&name)
.ok_or_else(|| GraphError::invalid_layer(name.to_string(), valid_layers))?;
let id = self.inner.find_layer_id(&name).ok_or_else(|| {
GraphError::invalid_layer(name.to_string(), get_valid_layers(&self.inner))
})?;
Ok(LayerIds::One(id))
}
Layer::None => Ok(LayerIds::None),
Expand All @@ -39,7 +41,10 @@ impl InternalLayerOps for DiskGraph {
.iter()
.map(|name| {
self.inner.find_layer_id(name).ok_or_else(|| {
GraphError::invalid_layer(name.to_string(), valid_layers.clone())
GraphError::invalid_layer(
name.to_string(),
get_valid_layers(&self.inner),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
16 changes: 8 additions & 8 deletions raphtory/src/io/parquet_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn load_nodes_from_parquet<
}
}

for path in get_parquet_file_paths(parquet_path) {
for path in get_parquet_file_paths(parquet_path)? {
let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?;
df.check_cols_exist(&cols_to_check)?;
let size = df.get_inner_size();
Expand Down Expand Up @@ -91,7 +91,7 @@ pub fn load_edges_from_parquet<
}
}

for path in get_parquet_file_paths(parquet_path) {
for path in get_parquet_file_paths(parquet_path)? {
let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?;
df.check_cols_exist(&cols_to_check)?;
let size = cols_to_check.len();
Expand Down Expand Up @@ -126,7 +126,7 @@ pub fn load_node_props_from_parquet<
let mut cols_to_check = vec![id];
cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new()));

for path in get_parquet_file_paths(parquet_path) {
for path in get_parquet_file_paths(parquet_path)? {
let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?;
df.check_cols_exist(&cols_to_check)?;
let size = cols_to_check.len();
Expand Down Expand Up @@ -164,7 +164,7 @@ pub fn load_edge_props_from_parquet<
}
cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new()));

for path in get_parquet_file_paths(parquet_path) {
for path in get_parquet_file_paths(parquet_path)? {
let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?;
df.check_cols_exist(&cols_to_check)?;
let size = cols_to_check.len();
Expand Down Expand Up @@ -203,7 +203,7 @@ pub fn load_edges_deletions_from_parquet<
}
}

for path in get_parquet_file_paths(parquet_path) {
for path in get_parquet_file_paths(parquet_path)? {
let df = process_parquet_file_to_df(path.as_path(), cols_to_check.clone())?;
df.check_cols_exist(&cols_to_check)?;
let size = cols_to_check.len();
Expand Down Expand Up @@ -284,7 +284,7 @@ fn read_parquet_file(
Ok((names, reader))
}

fn get_parquet_file_paths(parquet_path: &Path) -> Vec<PathBuf> {
fn get_parquet_file_paths(parquet_path: &Path) -> Result<Vec<PathBuf>, GraphError> {
let mut parquet_files = Vec::new();
if parquet_path.is_file() {
parquet_files.push(parquet_path.to_path_buf());
Expand All @@ -297,10 +297,10 @@ fn get_parquet_file_paths(parquet_path: &Path) -> Vec<PathBuf> {
}
}
} else {
println!("Invalid path provided: {:?}", parquet_path);
return Err(GraphError::InvalidPath(parquet_path.display().to_string()));
}

parquet_files
Ok(parquet_files)
}

#[cfg(test)]
Expand Down

0 comments on commit 07d12f3

Please sign in to comment.