diff --git a/Cargo.lock b/Cargo.lock index 423f190f8..835dae15e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4863,6 +4863,7 @@ dependencies = [ "rayon", "regex", "reqwest 0.12.9", + "roaring", "rustc-hash 2.0.0", "serde", "serde_json", @@ -5259,6 +5260,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "roaring" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" +dependencies = [ + "bytemuck", + "byteorder", +] + [[package]] name = "ron" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index c1d02e52e..014018e2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ numpy = "0.22.1" itertools = "0.13.0" rand = "0.8.5" rayon = "1.8.1" +roaring = "0.10.6" sorted_vector_map = "0.2.0" tokio = { version = "1.36.0", features = ["full"] } once_cell = "1.19.0" diff --git a/pometry-storage-private b/pometry-storage-private index 5c57e030b..4312243c3 160000 --- a/pometry-storage-private +++ b/pometry-storage-private @@ -1 +1 @@ -Subproject commit 5c57e030bd9eedc2f645aad38fb8243bffd1d990 +Subproject commit 4312243c364254798c673c3681e8a5425204fd8e diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 246d42176..50655e6aa 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -77,6 +77,7 @@ pometry-storage = { workspace = true, optional = true } prost = { workspace = true, optional = true } prost-types = { workspace = true, optional = true } +roaring ={ workspace = true } [dev-dependencies] csv = { workspace = true } diff --git a/raphtory/src/db/api/storage/graph/edges/edge_entry.rs b/raphtory/src/db/api/storage/graph/edges/edge_entry.rs index e239952b1..db9467701 100644 --- a/raphtory/src/db/api/storage/graph/edges/edge_entry.rs +++ b/raphtory/src/db/api/storage/graph/edges/edge_entry.rs @@ -13,6 +13,7 @@ use crate::{ tprop_storage_ops::TPropOps, }, }; +use raphtory_api::core::entities::EID; use rayon::prelude::*; use std::ops::Range; @@ -60,6 +61,10 @@ impl<'a, 'b: 'a> EdgeStorageOps<'a> for &'a EdgeStorageEntry<'b> { self.as_ref().dst() } + fn eid(self) -> EID { + self.as_ref().eid() + } + fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator + 'a { self.as_ref().layer_ids_iter(layer_ids) } diff --git a/raphtory/src/db/api/storage/graph/edges/edge_ref.rs b/raphtory/src/db/api/storage/graph/edges/edge_ref.rs index b3b227a8c..6323a6dc4 100644 --- a/raphtory/src/db/api/storage/graph/edges/edge_ref.rs +++ b/raphtory/src/db/api/storage/graph/edges/edge_ref.rs @@ -9,6 +9,7 @@ use crate::{ tprop_storage_ops::TPropOps, }, }; +use raphtory_api::core::entities::EID; use rayon::prelude::*; use std::ops::Range; @@ -74,6 +75,10 @@ impl<'a> EdgeStorageOps<'a> for EdgeStorageRef<'a> { for_all!(self, edge => edge.dst()) } + fn eid(self) -> EID { + for_all!(self, edge => edge.eid()) + } + fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator + 'a { for_all_iter!(self, edge => EdgeStorageOps::layer_ids_iter(edge, layer_ids)) } diff --git a/raphtory/src/db/api/storage/graph/edges/edge_storage_ops.rs b/raphtory/src/db/api/storage/graph/edges/edge_storage_ops.rs index 51d4f8bde..f1f87a83a 100644 --- a/raphtory/src/db/api/storage/graph/edges/edge_storage_ops.rs +++ b/raphtory/src/db/api/storage/graph/edges/edge_storage_ops.rs @@ -150,6 +150,7 @@ pub trait EdgeStorageOps<'a>: Copy + Sized + Send + Sync + 'a { fn has_layer(self, layer_ids: &LayerIds) -> bool; fn src(self) -> VID; fn dst(self) -> VID; + fn eid(self) -> EID; fn layer_ids_iter(self, layer_ids: &LayerIds) -> impl Iterator + 'a; @@ -325,6 +326,10 @@ impl<'a> EdgeStorageOps<'a> for MemEdge<'a> { self.edge_store().dst } + fn eid(self) -> EID { + self.eid() + } + fn out_ref(self) -> EdgeRef { EdgeRef::new_outgoing(self.eid(), self.src(), self.dst()) } diff --git a/raphtory/src/db/api/view/graph.rs b/raphtory/src/db/api/view/graph.rs index 65dbc2d75..80e0dbc18 100644 --- a/raphtory/src/db/api/view/graph.rs +++ b/raphtory/src/db/api/view/graph.rs @@ -22,7 +22,8 @@ use crate::{ node::NodeView, nodes::Nodes, views::{ - node_subgraph::NodeSubgraph, node_type_filtered_subgraph::TypeFilteredSubgraph, + masked_graph::MaskedGraph, node_subgraph::NodeSubgraph, + node_type_filtered_subgraph::TypeFilteredSubgraph, }, }, }, @@ -66,6 +67,8 @@ pub trait GraphViewOps<'graph>: BoxableGraphView + Sized + Clone + 'graph { fn subgraph, V: AsNodeRef>(&self, nodes: I) -> NodeSubgraph; + fn masked(&self) -> MaskedGraph; + fn subgraph_node_types, V: Borrow>( &self, nodes_types: I, @@ -345,6 +348,10 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph> NodeSubgraph::new(self.clone(), nodes) } + fn masked(&self) -> MaskedGraph { + MaskedGraph::new(self.clone()) + } + fn subgraph_node_types, V: Borrow>( &self, nodes_types: I, diff --git a/raphtory/src/db/graph/views/masked_graph.rs b/raphtory/src/db/graph/views/masked_graph.rs new file mode 100644 index 000000000..9d072b3a5 --- /dev/null +++ b/raphtory/src/db/graph/views/masked_graph.rs @@ -0,0 +1,211 @@ +use crate::{ + core::entities::LayerIds, + db::api::{ + properties::internal::InheritPropertiesOps, + storage::graph::{ + edges::{edge_ref::EdgeStorageRef, edge_storage_ops::EdgeStorageOps}, + nodes::{node_ref::NodeStorageRef, node_storage_ops::NodeStorageOps}, + }, + view::internal::{ + Base, EdgeFilterOps, Immutable, InheritCoreOps, InheritLayerOps, InheritListOps, + InheritMaterialize, InheritTimeSemantics, NodeFilterOps, Static, + }, + }, + prelude::{GraphViewOps, LayerOps}, +}; +use roaring::RoaringTreemap; +use std::{ + fmt::{Debug, Formatter}, + sync::Arc, +}; + +#[derive(Clone)] +pub struct MaskedGraph { + pub(crate) graph: G, + pub(crate) layered_mask: Arc<[(RoaringTreemap, RoaringTreemap)]>, +} + +impl Static for MaskedGraph {} + +impl<'graph, G: Debug + 'graph> Debug for MaskedGraph { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MaskedGraph") + .field("graph", &self.graph as &dyn Debug) + .finish() + } +} + +impl<'graph, G: GraphViewOps<'graph>> Base for MaskedGraph { + type Base = G; + #[inline(always)] + fn base(&self) -> &Self::Base { + &self.graph + } +} + +impl<'graph, G: GraphViewOps<'graph>> Immutable for MaskedGraph {} + +impl<'graph, G: GraphViewOps<'graph>> InheritCoreOps for MaskedGraph {} +impl<'graph, G: GraphViewOps<'graph>> InheritTimeSemantics for MaskedGraph {} +impl<'graph, G: GraphViewOps<'graph>> InheritPropertiesOps for MaskedGraph {} +impl<'graph, G: GraphViewOps<'graph>> InheritMaterialize for MaskedGraph {} +impl<'graph, G: GraphViewOps<'graph>> InheritLayerOps for MaskedGraph {} + +impl<'graph, G: GraphViewOps<'graph>> MaskedGraph { + pub fn new(graph: G) -> Self { + let mut layered_masks = vec![]; + for l_name in graph.unique_layers() { + let l_id = graph.get_layer_id(&l_name).unwrap(); + let layer_g = graph.layers(l_name).unwrap(); + let nodes = layer_g.nodes(); + let edges = layer_g.edges(); + + let nodes: RoaringTreemap = nodes.into_iter().map(|id| id.node.as_u64()).collect(); + let edges: RoaringTreemap = + edges.into_iter().map(|id| id.edge.pid().as_u64()).collect(); + + if layered_masks.len() < l_id + 1 { + layered_masks.resize(l_id + 1, (RoaringTreemap::new(), RoaringTreemap::new())); + } + + layered_masks[l_id] = (nodes, edges); + } + + Self { + graph, + layered_mask: layered_masks.into(), + } + } +} + +// FIXME: this should use the list version ideally +impl<'graph, G: GraphViewOps<'graph>> InheritListOps for MaskedGraph {} +impl<'graph, G: GraphViewOps<'graph>> EdgeFilterOps for MaskedGraph { + #[inline] + fn edges_filtered(&self) -> bool { + true + } + + #[inline] + fn edge_list_trusted(&self) -> bool { + false + } + + #[inline] + fn edge_filter_includes_node_filter(&self) -> bool { + self.graph.edge_filter_includes_node_filter() + } + + #[inline] + fn filter_edge(&self, edge: EdgeStorageRef, layer_ids: &LayerIds) -> bool { + let filter_fn = |(nodes, edges): &(RoaringTreemap, RoaringTreemap)| { + edges.contains(edge.eid().as_u64()) + && nodes.contains(edge.src().as_u64()) + && nodes.contains(edge.dst().as_u64()) + }; + match layer_ids { + LayerIds::None => false, + LayerIds::All => self.layered_mask.iter().any(filter_fn), + LayerIds::One(id) => self.layered_mask.get(*id).map(filter_fn).unwrap_or(false), + LayerIds::Multiple(multiple) => multiple + .iter() + .any(|id| self.layered_mask.get(id).map(filter_fn).unwrap_or(false)), + } + } +} + +impl<'graph, G: GraphViewOps<'graph>> NodeFilterOps for MaskedGraph { + fn nodes_filtered(&self) -> bool { + true + } + // FIXME: should use list version and make this true + fn node_list_trusted(&self) -> bool { + false + } + + #[inline] + fn filter_node(&self, node: NodeStorageRef, layer_ids: &LayerIds) -> bool { + match layer_ids { + LayerIds::None => false, + LayerIds::All => self + .layered_mask + .iter() + .any(|(nodes, _)| nodes.contains(node.vid().as_u64())), + LayerIds::One(id) => self + .layered_mask + .get(*id) + .map(|(nodes, _)| nodes.contains(node.vid().as_u64())) + .unwrap_or(false), + LayerIds::Multiple(multiple) => multiple.iter().any(|id| { + self.layered_mask + .get(id) + .map(|(nodes, _)| nodes.contains(node.vid().as_u64())) + .unwrap_or(false) + }), + } + } +} + +#[cfg(test)] +mod masked_graph_tests { + use crate::{ + algorithms::motifs::triangle_count::triangle_count, db::graph::graph::assert_graph_equal, + prelude::*, test_storage, + }; + use itertools::Itertools; + + #[test] + fn test_materialize_no_edges() { + let graph = Graph::new(); + + graph.add_node(1, 1, NO_PROPS, None).unwrap(); + graph.add_node(2, 2, NO_PROPS, None).unwrap(); + + test_storage!(&graph, |graph| { + let sg = graph.masked(); + + let actual = sg.materialize().unwrap().into_events().unwrap(); + assert_graph_equal(&actual, &sg); + }); + } + + #[test] + fn test_mask_the_window_50pc() { + let graph = Graph::new(); + let edges = vec![ + (1, 2, 1), + (1, 3, 2), + (1, 4, 3), + (3, 1, 4), + (3, 4, 5), + (3, 5, 6), + (4, 5, 7), + (5, 6, 8), + (5, 8, 9), + (7, 5, 10), + (8, 5, 11), + (1, 9, 12), + (9, 1, 13), + (6, 3, 14), + (4, 8, 15), + (8, 3, 16), + (5, 10, 17), + (10, 5, 18), + (10, 8, 19), + (1, 11, 20), + (11, 1, 21), + (9, 11, 22), + (11, 9, 23), + ]; + for (src, dst, ts) in edges { + graph.add_edge(ts, src, dst, NO_PROPS, None).unwrap(); + } + test_storage!(&graph, |graph| { + let window = graph.window(12, 24); + let mask = window.masked(); + let ts = triangle_count(&mask, None); + let tg = triangle_count(&window, None); + assert_eq!(ts, tg) + }); + } +} diff --git a/raphtory/src/db/graph/views/mod.rs b/raphtory/src/db/graph/views/mod.rs index 9ae9eb378..cea08f8d8 100644 --- a/raphtory/src/db/graph/views/mod.rs +++ b/raphtory/src/db/graph/views/mod.rs @@ -1,5 +1,6 @@ pub mod deletion_graph; pub mod layer_graph; +pub mod masked_graph; pub mod node_subgraph; pub mod node_type_filtered_subgraph; pub mod property_filter; diff --git a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs index f0c124ae7..277446185 100644 --- a/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs +++ b/raphtory/src/disk_graph/graph_impl/edge_storage_ops.rs @@ -11,7 +11,10 @@ use crate::{ }, }; use pometry_storage::{edge::Edge, tprops::DiskTProp}; -use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, storage::timeindex::TimeIndexEntry}; +use raphtory_api::core::{ + entities::{edges::edge_ref::EdgeRef, EID}, + storage::timeindex::TimeIndexEntry, +}; use rayon::prelude::*; use std::{iter, ops::Range}; @@ -48,6 +51,10 @@ impl<'a> EdgeStorageOps<'a> for Edge<'a> { self.dst_id() } + fn eid(self) -> EID { + self.pid() + } + fn out_ref(self) -> EdgeRef { EdgeRef::new_outgoing(self.eid(), self.src_id(), self.dst_id()) } diff --git a/raphtory/src/disk_graph/graph_impl/mod.rs b/raphtory/src/disk_graph/graph_impl/mod.rs index 5444b9967..08222302a 100644 --- a/raphtory/src/disk_graph/graph_impl/mod.rs +++ b/raphtory/src/disk_graph/graph_impl/mod.rs @@ -35,17 +35,13 @@ mod test { use itertools::Itertools; use proptest::{prelude::*, sample::size_range}; - use raphtory_api::core::{entities::VID, storage::timeindex::TimeIndexOps}; use rayon::prelude::*; use tempfile::TempDir; use pometry_storage::{graph::TemporalGraph, properties::Properties}; use crate::{ - db::api::{ - storage::graph::{nodes::node_storage_ops::NodeStorageOps, storage_ops::GraphStorage}, - view::{internal::CoreGraphOps, StaticGraphViewOps}, - }, + db::api::{storage::graph::storage_ops::GraphStorage, view::StaticGraphViewOps}, disk_graph::Time, prelude::*, }; diff --git a/raphtory/src/disk_graph/storage_interface/node.rs b/raphtory/src/disk_graph/storage_interface/node.rs index 1dd6bb59f..d35812cea 100644 --- a/raphtory/src/disk_graph/storage_interface/node.rs +++ b/raphtory/src/disk_graph/storage_interface/node.rs @@ -268,7 +268,7 @@ impl<'a> NodeStorageOps<'a> for DiskNode<'a> { LayerIds::All => self .graph .find_edge(self.vid, dst) - .map(|e| EdgeRef::new_outgoing(e.eid(), self.vid, dst)), + .map(|e| EdgeRef::new_outgoing(e.pid(), self.vid, dst)), LayerIds::One(id) => { let eid = self.graph.layers()[*id] .nodes_storage() diff --git a/raphtory/src/python/graph/views/graph_view.rs b/raphtory/src/python/graph/views/graph_view.rs index 47096dd30..8909b762b 100644 --- a/raphtory/src/python/graph/views/graph_view.rs +++ b/raphtory/src/python/graph/views/graph_view.rs @@ -18,6 +18,7 @@ use crate::{ nodes::Nodes, views::{ layer_graph::LayeredGraph, + masked_graph::MaskedGraph, node_subgraph::NodeSubgraph, node_type_filtered_subgraph::TypeFilteredSubgraph, property_filter::{ @@ -106,6 +107,12 @@ impl IntoPy for NodeSubgraph { } } +impl IntoPy for MaskedGraph { + fn into_py(self, py: Python<'_>) -> PyObject { + PyGraphView::from(self).into_py(py) + } +} + impl IntoPy for TypeFilteredSubgraph { fn into_py(self, py: Python<'_>) -> PyObject { PyGraphView::from(self).into_py(py) @@ -349,6 +356,16 @@ impl PyGraphView { self.graph.subgraph(nodes) } + /// Applies the filters to the graph and retains the node ids and the edge ids + /// in the graph that satisfy the filters + /// creates bitsets per layer for nodes and edges + /// + /// Returns: + /// MaskedGraph: Returns the masked graph + fn masked_graph(&self) -> MaskedGraph { + self.graph.masked() + } + /// Returns a subgraph filtered by node types given a set of node types /// /// Arguments: