Skip to content

Commit

Permalink
Feature/more merge tests (#1702)
Browse files Browse the repository at this point in the history
* add graph_dir to DiskGraphStorage

* make load_from_parquet easier to use in rust

* update submodule and fix warnings
  • Loading branch information
ljeub-pometry authored Jul 31, 2024
1 parent a6371a2 commit 7e0db58
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pometry-storage-private
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ mod lpa_tests {
let seed = Some([5; 32]);
let result = label_propagation(graph, seed).unwrap();

let ids = result
.iter()
.map(|n_set| n_set.iter().map(|n| n.node).collect::<Vec<_>>())
.collect::<Vec<_>>();

let expected = vec![
HashSet::from([
graph.node("R1").unwrap(),
Expand Down
23 changes: 10 additions & 13 deletions raphtory/src/disk_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ pub struct DiskGraphStorage {
node_meta: Arc<Meta>,
edge_meta: Arc<Meta>,
graph_props: Arc<GraphMeta>,
graph_dir: PathBuf,
}

impl Serialize for DiskGraphStorage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let path = self.graph_dir.clone();
let path = self.graph_dir();
path.serialize(serializer)
}
}
Expand Down Expand Up @@ -95,7 +94,7 @@ impl AsRef<TemporalGraph> for DiskGraphStorage {

impl DiskGraphStorage {
pub fn graph_dir(&self) -> &Path {
&self.graph_dir
self.inner.graph_dir()
}

pub fn valid_layer_ids_from_names(&self, key: Layer) -> LayerIds {
Expand Down Expand Up @@ -228,10 +227,10 @@ impl DiskGraphStorage {
) -> Result<DiskGraphStorage, DiskGraphError> {
let graph_dir = new_graph_dir.as_ref();
let inner = merge_graphs(graph_dir, &self.inner, &other.inner)?;
Ok(DiskGraphStorage::new(inner, graph_dir.to_path_buf()))
Ok(DiskGraphStorage::new(inner))
}

fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self {
fn new(inner_graph: TemporalGraph) -> Self {
let node_meta = Meta::new();
let mut edge_meta = Meta::new();
let graph_meta = GraphMeta::new();
Expand Down Expand Up @@ -289,15 +288,14 @@ impl DiskGraphStorage {
node_meta: Arc::new(node_meta),
edge_meta: Arc::new(edge_meta),
graph_props: Arc::new(graph_meta),
graph_dir,
}
}

pub fn from_graph(graph: &Graph, graph_dir: impl AsRef<Path>) -> Result<Self, DiskGraphError> {
let inner_graph = TemporalGraph::from_graph(graph, graph_dir.as_ref(), || {
make_node_properties_from_graph(graph, graph_dir.as_ref())
})?;
Ok(Self::new(inner_graph, graph_dir.as_ref().to_path_buf()))
Ok(Self::new(inner_graph))
}

pub fn load_from_edge_lists(
Expand All @@ -309,7 +307,6 @@ impl DiskGraphStorage {
dst_col_idx: usize,
time_col_idx: usize,
) -> Result<Self, RAError> {
let path = graph_dir.as_ref().to_path_buf();
let inner = TemporalGraph::from_sorted_edge_list(
graph_dir,
src_col_idx,
Expand All @@ -319,13 +316,12 @@ impl DiskGraphStorage {
t_props_chunk_size,
edge_list,
)?;
Ok(Self::new(inner, path))
Ok(Self::new(inner))
}

pub fn load_from_dir(graph_dir: impl AsRef<Path>) -> Result<DiskGraphStorage, RAError> {
let path = graph_dir.as_ref().to_path_buf();
let inner = TemporalGraph::new(graph_dir)?;
Ok(Self::new(inner, path))
Ok(Self::new(inner))
}

pub fn load_from_parquets<P: AsRef<Path>>(
Expand Down Expand Up @@ -366,7 +362,7 @@ impl DiskGraphStorage {
node_properties.as_ref().map(|p| p.as_ref()),
node_type_col,
)?;
Ok(Self::new(t_graph, graph_dir.as_ref().to_path_buf()))
Ok(Self::new(t_graph))
}

pub fn filtered_layers_par<'a>(
Expand Down Expand Up @@ -401,12 +397,13 @@ impl DiskGraphStorage {
.expect("Failed to create global order");

let inner = TemporalGraph::new_from_layers(
path,
global_ordering,
Arc::new(global_order),
vec![layer],
vec!["_default".to_string()],
);
Self::new(inner, path)
Self::new(inner)
}

pub fn node_meta(&self) -> &Meta {
Expand Down
3 changes: 2 additions & 1 deletion raphtory/src/io/parquet_loaders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn load_edges_from_parquet<
G: StaticGraphViewOps + InternalPropertyAdditionOps + InternalAdditionOps,
>(
graph: &G,
parquet_path: &Path,
parquet_path: impl AsRef<Path>,
src: &str,
dst: &str,
time: &str,
Expand All @@ -82,6 +82,7 @@ pub fn load_edges_from_parquet<
layer: Option<&str>,
layer_in_df: Option<bool>,
) -> Result<(), GraphError> {
let parquet_path = parquet_path.as_ref();
let mut cols_to_check = vec![src, dst, time];
cols_to_check.extend(properties.as_ref().unwrap_or(&Vec::new()));
cols_to_check.extend(const_properties.as_ref().unwrap_or(&Vec::new()));
Expand Down

0 comments on commit 7e0db58

Please sign in to comment.