Skip to content

Commit

Permalink
add masked graph
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Nov 25, 2024
1 parent 3905856 commit 58af97c
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 9 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ numpy = "0.22.1"
itertools = "0.13.0"
rand = "0.8.5"
rayon = "1.8.1"
roaring = "0.10.6"
sorted_vector_map = "0.2.0"
tokio = { version = "1.36.0", features = ["full"] }
once_cell = "1.19.0"
Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
1 change: 1 addition & 0 deletions raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pometry-storage = { workspace = true, optional = true }

prost = { workspace = true, optional = true }
prost-types = { workspace = true, optional = true }
roaring ={ workspace = true }

[dev-dependencies]
csv = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions raphtory/src/db/api/storage/graph/edges/edge_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
tprop_storage_ops::TPropOps,
},
};
use raphtory_api::core::entities::EID;
use rayon::prelude::*;
use std::ops::Range;

Expand Down Expand Up @@ -60,6 +61,10 @@ impl<'a, 'b: 'a> EdgeStorageOps<'a> for &'a EdgeStorageEntry<'b> {
self.as_ref().dst()
}

fn eid(self) -> EID {
self.as_ref().eid()
}

fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator<Item = usize> + 'a {
self.as_ref().layer_ids_iter(layer_ids)
}
Expand Down
5 changes: 5 additions & 0 deletions raphtory/src/db/api/storage/graph/edges/edge_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
tprop_storage_ops::TPropOps,
},
};
use raphtory_api::core::entities::EID;
use rayon::prelude::*;
use std::ops::Range;

Expand Down Expand Up @@ -74,6 +75,10 @@ impl<'a> EdgeStorageOps<'a> for EdgeStorageRef<'a> {
for_all!(self, edge => edge.dst())
}

fn eid(self) -> EID {
for_all!(self, edge => edge.eid())
}

fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator<Item = usize> + 'a {
for_all_iter!(self, edge => EdgeStorageOps::layer_ids_iter(edge, layer_ids))
}
Expand Down
5 changes: 5 additions & 0 deletions raphtory/src/db/api/storage/graph/edges/edge_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub trait EdgeStorageOps<'a>: Copy + Sized + Send + Sync + 'a {
fn has_layer(self, layer_ids: &LayerIds) -> bool;
fn src(self) -> VID;
fn dst(self) -> VID;
fn eid(self) -> EID;

fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator<Item = usize> + 'a;

Expand Down Expand Up @@ -325,6 +326,10 @@ impl<'a> EdgeStorageOps<'a> for MemEdge<'a> {
self.edge_store().dst
}

fn eid(self) -> EID {
self.eid()
}

fn out_ref(self) -> EdgeRef {
EdgeRef::new_outgoing(self.eid(), self.src(), self.dst())
}
Expand Down
9 changes: 8 additions & 1 deletion raphtory/src/db/api/view/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{
node::NodeView,
nodes::Nodes,
views::{
node_subgraph::NodeSubgraph, node_type_filtered_subgraph::TypeFilteredSubgraph,
masked_graph::MaskedGraph, node_subgraph::NodeSubgraph,
node_type_filtered_subgraph::TypeFilteredSubgraph,
},
},
},
Expand Down Expand Up @@ -66,6 +67,8 @@ pub trait GraphViewOps<'graph>: BoxableGraphView + Sized + Clone + 'graph {

fn subgraph<I: IntoIterator<Item = V>, V: AsNodeRef>(&self, nodes: I) -> NodeSubgraph<Self>;

fn masked(&self) -> MaskedGraph<Self>;

fn subgraph_node_types<I: IntoIterator<Item = V>, V: Borrow<str>>(
&self,
nodes_types: I,
Expand Down Expand Up @@ -345,6 +348,10 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
NodeSubgraph::new(self.clone(), nodes)
}

fn masked(&self) -> MaskedGraph<G> {
MaskedGraph::new(self.clone())
}

fn subgraph_node_types<I: IntoIterator<Item = V>, V: Borrow<str>>(
&self,
nodes_types: I,
Expand Down
211 changes: 211 additions & 0 deletions raphtory/src/db/graph/views/masked_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use crate::{
core::entities::LayerIds,
db::api::{
properties::internal::InheritPropertiesOps,
storage::graph::{
edges::{edge_ref::EdgeStorageRef, edge_storage_ops::EdgeStorageOps},
nodes::{node_ref::NodeStorageRef, node_storage_ops::NodeStorageOps},
},
view::internal::{
Base, EdgeFilterOps, Immutable, InheritCoreOps, InheritLayerOps, InheritListOps,
InheritMaterialize, InheritTimeSemantics, NodeFilterOps, Static,
},
},
prelude::{GraphViewOps, LayerOps},
};
use roaring::RoaringTreemap;
use std::{
fmt::{Debug, Formatter},
sync::Arc,
};

#[derive(Clone)]
pub struct MaskedGraph<G> {
pub(crate) graph: G,
pub(crate) layered_mask: Arc<[(RoaringTreemap, RoaringTreemap)]>,
}

impl<G> Static for MaskedGraph<G> {}

impl<'graph, G: Debug + 'graph> Debug for MaskedGraph<G> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MaskedGraph")
.field("graph", &self.graph as &dyn Debug)
.finish()
}
}

impl<'graph, G: GraphViewOps<'graph>> Base for MaskedGraph<G> {
type Base = G;
#[inline(always)]
fn base(&self) -> &Self::Base {
&self.graph
}
}

impl<'graph, G: GraphViewOps<'graph>> Immutable for MaskedGraph<G> {}

impl<'graph, G: GraphViewOps<'graph>> InheritCoreOps for MaskedGraph<G> {}
impl<'graph, G: GraphViewOps<'graph>> InheritTimeSemantics for MaskedGraph<G> {}
impl<'graph, G: GraphViewOps<'graph>> InheritPropertiesOps for MaskedGraph<G> {}
impl<'graph, G: GraphViewOps<'graph>> InheritMaterialize for MaskedGraph<G> {}
impl<'graph, G: GraphViewOps<'graph>> InheritLayerOps for MaskedGraph<G> {}

impl<'graph, G: GraphViewOps<'graph>> MaskedGraph<G> {
pub fn new(graph: G) -> Self {
let mut layered_masks = vec![];
for l_name in graph.unique_layers() {
let l_id = graph.get_layer_id(&l_name).unwrap();
let layer_g = graph.layers(l_name).unwrap();
let nodes = layer_g.nodes();
let edges = layer_g.edges();

let nodes: RoaringTreemap = nodes.into_iter().map(|id| id.node.as_u64()).collect();
let edges: RoaringTreemap =
edges.into_iter().map(|id| id.edge.pid().as_u64()).collect();

if layered_masks.len() < l_id + 1 {
layered_masks.resize(l_id + 1, (RoaringTreemap::new(), RoaringTreemap::new()));
}

layered_masks[l_id] = (nodes, edges);
}

Self {
graph,
layered_mask: layered_masks.into(),
}
}
}

// FIXME: this should use the list version ideally
impl<'graph, G: GraphViewOps<'graph>> InheritListOps for MaskedGraph<G> {}
impl<'graph, G: GraphViewOps<'graph>> EdgeFilterOps for MaskedGraph<G> {
#[inline]
fn edges_filtered(&self) -> bool {
true
}

#[inline]
fn edge_list_trusted(&self) -> bool {
false
}

#[inline]
fn edge_filter_includes_node_filter(&self) -> bool {
self.graph.edge_filter_includes_node_filter()
}

#[inline]
fn filter_edge(&self, edge: EdgeStorageRef, layer_ids: &LayerIds) -> bool {
let filter_fn = |(nodes, edges): &(RoaringTreemap, RoaringTreemap)| {
edges.contains(edge.eid().as_u64())
&& nodes.contains(edge.src().as_u64())
&& nodes.contains(edge.dst().as_u64())
};
match layer_ids {
LayerIds::None => false,
LayerIds::All => self.layered_mask.iter().any(filter_fn),
LayerIds::One(id) => self.layered_mask.get(*id).map(filter_fn).unwrap_or(false),
LayerIds::Multiple(multiple) => multiple
.iter()
.any(|id| self.layered_mask.get(id).map(filter_fn).unwrap_or(false)),
}
}
}

impl<'graph, G: GraphViewOps<'graph>> NodeFilterOps for MaskedGraph<G> {
fn nodes_filtered(&self) -> bool {
true
}
// FIXME: should use list version and make this true
fn node_list_trusted(&self) -> bool {
false
}

#[inline]
fn filter_node(&self, node: NodeStorageRef, layer_ids: &LayerIds) -> bool {
match layer_ids {
LayerIds::None => false,
LayerIds::All => self
.layered_mask
.iter()
.any(|(nodes, _)| nodes.contains(node.vid().as_u64())),
LayerIds::One(id) => self
.layered_mask
.get(*id)
.map(|(nodes, _)| nodes.contains(node.vid().as_u64()))
.unwrap_or(false),
LayerIds::Multiple(multiple) => multiple.iter().any(|id| {
self.layered_mask
.get(id)
.map(|(nodes, _)| nodes.contains(node.vid().as_u64()))
.unwrap_or(false)
}),
}
}
}

#[cfg(test)]
mod masked_graph_tests {
use crate::{
algorithms::motifs::triangle_count::triangle_count, db::graph::graph::assert_graph_equal,
prelude::*, test_storage,
};
use itertools::Itertools;

#[test]
fn test_materialize_no_edges() {
let graph = Graph::new();

graph.add_node(1, 1, NO_PROPS, None).unwrap();
graph.add_node(2, 2, NO_PROPS, None).unwrap();

test_storage!(&graph, |graph| {
let sg = graph.masked();

let actual = sg.materialize().unwrap().into_events().unwrap();
assert_graph_equal(&actual, &sg);
});
}

#[test]
fn test_mask_the_window_50pc() {
let graph = Graph::new();
let edges = vec![
(1, 2, 1),
(1, 3, 2),
(1, 4, 3),
(3, 1, 4),
(3, 4, 5),
(3, 5, 6),
(4, 5, 7),
(5, 6, 8),
(5, 8, 9),
(7, 5, 10),
(8, 5, 11),
(1, 9, 12),
(9, 1, 13),
(6, 3, 14),
(4, 8, 15),
(8, 3, 16),
(5, 10, 17),
(10, 5, 18),
(10, 8, 19),
(1, 11, 20),
(11, 1, 21),
(9, 11, 22),
(11, 9, 23),
];
for (src, dst, ts) in edges {
graph.add_edge(ts, src, dst, NO_PROPS, None).unwrap();
}
test_storage!(&graph, |graph| {
let window = graph.window(12, 24);
let mask = window.masked();
let ts = triangle_count(&mask, None);
let tg = triangle_count(&window, None);
assert_eq!(ts, tg)
});
}
}
1 change: 1 addition & 0 deletions raphtory/src/db/graph/views/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod deletion_graph;
pub mod layer_graph;
pub mod masked_graph;
pub mod node_subgraph;
pub mod node_type_filtered_subgraph;
pub mod property_filter;
Expand Down
9 changes: 8 additions & 1 deletion raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
},
};
use pometry_storage::{edge::Edge, tprops::DiskTProp};
use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, storage::timeindex::TimeIndexEntry};
use raphtory_api::core::{
entities::{edges::edge_ref::EdgeRef, EID},
storage::timeindex::TimeIndexEntry,
};
use rayon::prelude::*;
use std::{iter, ops::Range};

Expand Down Expand Up @@ -48,6 +51,10 @@ impl<'a> EdgeStorageOps<'a> for Edge<'a> {
self.dst_id()
}

fn eid(self) -> EID {
self.pid()
}

fn out_ref(self) -> EdgeRef {
EdgeRef::new_outgoing(self.eid(), self.src_id(), self.dst_id())
}
Expand Down
6 changes: 1 addition & 5 deletions raphtory/src/disk_graph/graph_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@ mod test {

use itertools::Itertools;
use proptest::{prelude::*, sample::size_range};
use raphtory_api::core::{entities::VID, storage::timeindex::TimeIndexOps};
use rayon::prelude::*;
use tempfile::TempDir;

use pometry_storage::{graph::TemporalGraph, properties::Properties};

use crate::{
db::api::{
storage::graph::{nodes::node_storage_ops::NodeStorageOps, storage_ops::GraphStorage},
view::{internal::CoreGraphOps, StaticGraphViewOps},
},
db::api::{storage::graph::storage_ops::GraphStorage, view::StaticGraphViewOps},
disk_graph::Time,
prelude::*,
};
Expand Down
Loading

0 comments on commit 58af97c

Please sign in to comment.