From 27fd5d4e8cf288f6a3abd3097838eec0e95b2622 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Mon, 1 Jul 2024 13:55:50 +0200 Subject: [PATCH] add public interface for disk merge and start testing --- raphtory/src/disk_graph/graph_impl/mod.rs | 135 +++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/raphtory/src/disk_graph/graph_impl/mod.rs b/raphtory/src/disk_graph/graph_impl/mod.rs index 28809998c2..b14ba813a5 100644 --- a/raphtory/src/disk_graph/graph_impl/mod.rs +++ b/raphtory/src/disk_graph/graph_impl/mod.rs @@ -20,7 +20,7 @@ use crate::{ }; use pometry_storage::{ disk_hmap::DiskHashMap, graph::TemporalGraph, graph_fragment::TempColGraphFragment, - load::ExternalEdgeList, RAError, + load::ExternalEdgeList, merge::merge_graph::merge_graphs, RAError, }; use raphtory_api::core::storage::timeindex::TimeIndexEntry; use rayon::prelude::*; @@ -175,6 +175,16 @@ impl DiskGraph { .expect("failed to create graph") } + pub fn merge_by_sorted_gids( + &self, + other: &DiskGraph, + new_graph_dir: impl AsRef, + ) -> Result { + let graph_dir = new_graph_dir.as_ref(); + let inner = merge_graphs(graph_dir, &self.inner, &other.inner)?; + Ok(DiskGraph::new(inner, graph_dir.to_path_buf())) + } + fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self { let node_meta = Meta::new(); let mut edge_meta = Meta::new(); @@ -1091,3 +1101,126 @@ mod test { ); } } + +#[cfg(feature = "storage")] +#[cfg(test)] +mod storage_tests { + use crate::{ + db::graph::graph::assert_graph_equal, + prelude::{AdditionOps, Graph, GraphViewOps, NO_PROPS}, + }; + use proptest::prelude::*; + use std::collections::BTreeSet; + use tempfile::TempDir; + + #[test] + fn test_merge() { + let g1 = Graph::new(); + g1.add_node(0, 0, [("node_prop", 0f64)], Some("1")).unwrap(); + g1.add_node(0, 1, NO_PROPS, None).unwrap(); + g1.add_node(0, 2, [("node_prop", 2f64)], Some("2")).unwrap(); + g1.add_edge(1, 0, 1, [("test", 1i32)], None).unwrap(); + g1.add_edge(2, 0, 1, [("test", 2i32)], None).unwrap(); + g1.add_edge(2, 1, 2, [("test2", "test")], None).unwrap(); + g1.edge(0, 1) + .unwrap() + .add_constant_properties([("const_str", "test")], None) + .unwrap(); + g1.node(0) + .unwrap() + .add_updates(3, [("test", "test")]) + .unwrap(); + + let g2 = Graph::new(); + g2.add_node(1, 0, [("node_prop", 1f64)], Some("1")).unwrap(); + g2.add_node(0, 1, NO_PROPS, None).unwrap(); + g2.add_node(3, 2, [("node_prop", 3f64)], Some("2")).unwrap(); + g2.add_edge(1, 0, 1, [("test", 2i32)], None).unwrap(); + g2.add_edge(3, 0, 1, [("test", 3i32)], None).unwrap(); + g2.add_edge(2, 1, 2, [("test2", "test")], None).unwrap(); + g2.edge(0, 1) + .unwrap() + .add_constant_properties([("const_str", "test2")], None) + .unwrap(); + g2.node(0) + .unwrap() + .add_updates(3, [("test", "test")]) + .unwrap(); + let g1_dir = TempDir::new().unwrap(); + let g2_dir = TempDir::new().unwrap(); + let gm_dir = TempDir::new().unwrap(); + + let g1_a = g1.persist_as_disk_graph(&g1_dir).unwrap(); + let g2_a = g2.persist_as_disk_graph(&g2_dir).unwrap(); + + let gm = g1_a.merge_by_sorted_gids(&g2_a, &gm_dir).unwrap(); + } + + fn add_edges(g: &Graph, edges: &[(i64, u64, u64)]) { + let nodes: BTreeSet<_> = edges + .iter() + .flat_map(|(_, src, dst)| [*src, *dst]) + .collect(); + for n in nodes { + g.add_node(0, n, NO_PROPS, None).unwrap(); + } + for (t, src, dst) in edges { + g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap(); + } + } + + fn inner_merge_test(left_edges: &[(i64, u64, u64)], right_edges: &[(i64, u64, u64)]) { + let left_g = Graph::new(); + add_edges(&left_g, left_edges); + let right_g = Graph::new(); + add_edges(&right_g, right_edges); + let merged_g_expected = Graph::new(); + add_edges(&merged_g_expected, left_edges); + add_edges(&merged_g_expected, right_edges); + + let left_dir = TempDir::new().unwrap(); + let right_dir = TempDir::new().unwrap(); + let merged_dir = TempDir::new().unwrap(); + + let left_g_disk = left_g.persist_as_disk_graph(&left_dir).unwrap(); + let right_g_disk = right_g.persist_as_disk_graph(&right_dir).unwrap(); + + let merged_g_disk = left_g_disk + .merge_by_sorted_gids(&right_g_disk, &merged_dir) + .unwrap(); + assert_graph_equal(&merged_g_disk, &merged_g_expected) + } + + #[test] + fn test_merge_proptest() { + proptest!(|(left_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100), right_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100))| { + inner_merge_test(&left_edges, &right_edges) + }) + } + + #[test] + fn test_empty_graphs() { + inner_merge_test(&[], &[]) + } + + #[test] + fn test_merge_1_edge() { + let g1 = Graph::new(); + g1.add_node(0, 0, NO_PROPS, None).unwrap(); + g1.add_node(0, 1, NO_PROPS, None).unwrap(); + g1.add_edge(1, 0, 1, NO_PROPS, None).unwrap(); + + let g2 = Graph::new(); + g2.add_node(0, 0, NO_PROPS, None).unwrap(); + g2.add_edge(1, 0, 0, NO_PROPS, None).unwrap(); + + let g1_dir = TempDir::new().unwrap(); + let g2_dir = TempDir::new().unwrap(); + let gm_dir = TempDir::new().unwrap(); + + let g1_a = g1.persist_as_disk_graph(&g1_dir).unwrap(); + let g2_a = g2.persist_as_disk_graph(&g2_dir).unwrap(); + + let gm = g1_a.merge_by_sorted_gids(&g2_a, &gm_dir).unwrap(); + } +}