Skip to content

Commit

Permalink
add public interface for disk merge and start testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ljeub-pometry committed Jul 2, 2024
1 parent 61c2e42 commit 27fd5d4
Showing 1 changed file with 134 additions and 1 deletion.
135 changes: 134 additions & 1 deletion raphtory/src/disk_graph/graph_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
};
use pometry_storage::{
disk_hmap::DiskHashMap, graph::TemporalGraph, graph_fragment::TempColGraphFragment,
load::ExternalEdgeList, RAError,
load::ExternalEdgeList, merge::merge_graph::merge_graphs, RAError,
};
use raphtory_api::core::storage::timeindex::TimeIndexEntry;
use rayon::prelude::*;
Expand Down Expand Up @@ -175,6 +175,16 @@ impl DiskGraph {
.expect("failed to create graph")
}

pub fn merge_by_sorted_gids(
&self,
other: &DiskGraph,
new_graph_dir: impl AsRef<Path>,
) -> Result<DiskGraph, Error> {
let graph_dir = new_graph_dir.as_ref();
let inner = merge_graphs(graph_dir, &self.inner, &other.inner)?;
Ok(DiskGraph::new(inner, graph_dir.to_path_buf()))
}

fn new(inner_graph: TemporalGraph, graph_dir: PathBuf) -> Self {
let node_meta = Meta::new();
let mut edge_meta = Meta::new();
Expand Down Expand Up @@ -1091,3 +1101,126 @@ mod test {
);
}
}

#[cfg(feature = "storage")]
#[cfg(test)]
mod storage_tests {
use crate::{
db::graph::graph::assert_graph_equal,
prelude::{AdditionOps, Graph, GraphViewOps, NO_PROPS},
};
use proptest::prelude::*;
use std::collections::BTreeSet;
use tempfile::TempDir;

#[test]
fn test_merge() {
let g1 = Graph::new();
g1.add_node(0, 0, [("node_prop", 0f64)], Some("1")).unwrap();
g1.add_node(0, 1, NO_PROPS, None).unwrap();
g1.add_node(0, 2, [("node_prop", 2f64)], Some("2")).unwrap();
g1.add_edge(1, 0, 1, [("test", 1i32)], None).unwrap();
g1.add_edge(2, 0, 1, [("test", 2i32)], None).unwrap();
g1.add_edge(2, 1, 2, [("test2", "test")], None).unwrap();
g1.edge(0, 1)
.unwrap()
.add_constant_properties([("const_str", "test")], None)
.unwrap();
g1.node(0)
.unwrap()
.add_updates(3, [("test", "test")])
.unwrap();

let g2 = Graph::new();
g2.add_node(1, 0, [("node_prop", 1f64)], Some("1")).unwrap();
g2.add_node(0, 1, NO_PROPS, None).unwrap();
g2.add_node(3, 2, [("node_prop", 3f64)], Some("2")).unwrap();
g2.add_edge(1, 0, 1, [("test", 2i32)], None).unwrap();
g2.add_edge(3, 0, 1, [("test", 3i32)], None).unwrap();
g2.add_edge(2, 1, 2, [("test2", "test")], None).unwrap();
g2.edge(0, 1)
.unwrap()
.add_constant_properties([("const_str", "test2")], None)
.unwrap();
g2.node(0)
.unwrap()
.add_updates(3, [("test", "test")])
.unwrap();
let g1_dir = TempDir::new().unwrap();
let g2_dir = TempDir::new().unwrap();
let gm_dir = TempDir::new().unwrap();

let g1_a = g1.persist_as_disk_graph(&g1_dir).unwrap();
let g2_a = g2.persist_as_disk_graph(&g2_dir).unwrap();

let gm = g1_a.merge_by_sorted_gids(&g2_a, &gm_dir).unwrap();
}

fn add_edges(g: &Graph, edges: &[(i64, u64, u64)]) {
let nodes: BTreeSet<_> = edges
.iter()
.flat_map(|(_, src, dst)| [*src, *dst])
.collect();
for n in nodes {
g.add_node(0, n, NO_PROPS, None).unwrap();
}
for (t, src, dst) in edges {
g.add_edge(*t, *src, *dst, NO_PROPS, None).unwrap();
}
}

fn inner_merge_test(left_edges: &[(i64, u64, u64)], right_edges: &[(i64, u64, u64)]) {
let left_g = Graph::new();
add_edges(&left_g, left_edges);
let right_g = Graph::new();
add_edges(&right_g, right_edges);
let merged_g_expected = Graph::new();
add_edges(&merged_g_expected, left_edges);
add_edges(&merged_g_expected, right_edges);

let left_dir = TempDir::new().unwrap();
let right_dir = TempDir::new().unwrap();
let merged_dir = TempDir::new().unwrap();

let left_g_disk = left_g.persist_as_disk_graph(&left_dir).unwrap();
let right_g_disk = right_g.persist_as_disk_graph(&right_dir).unwrap();

let merged_g_disk = left_g_disk
.merge_by_sorted_gids(&right_g_disk, &merged_dir)
.unwrap();
assert_graph_equal(&merged_g_disk, &merged_g_expected)
}

#[test]
fn test_merge_proptest() {
proptest!(|(left_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100), right_edges in prop::collection::vec((0i64..10, 0u64..10, 0u64..10), 0..=100))| {
inner_merge_test(&left_edges, &right_edges)
})
}

#[test]
fn test_empty_graphs() {
inner_merge_test(&[], &[])
}

#[test]
fn test_merge_1_edge() {
let g1 = Graph::new();
g1.add_node(0, 0, NO_PROPS, None).unwrap();
g1.add_node(0, 1, NO_PROPS, None).unwrap();
g1.add_edge(1, 0, 1, NO_PROPS, None).unwrap();

let g2 = Graph::new();
g2.add_node(0, 0, NO_PROPS, None).unwrap();
g2.add_edge(1, 0, 0, NO_PROPS, None).unwrap();

let g1_dir = TempDir::new().unwrap();
let g2_dir = TempDir::new().unwrap();
let gm_dir = TempDir::new().unwrap();

let g1_a = g1.persist_as_disk_graph(&g1_dir).unwrap();
let g2_a = g2.persist_as_disk_graph(&g2_dir).unwrap();

let gm = g1_a.merge_by_sorted_gids(&g2_a, &gm_dir).unwrap();
}
}

0 comments on commit 27fd5d4

Please sign in to comment.