Skip to content

Commit

Permalink
change index to use RoaringTreeMap instead so it is more memory effic…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
ljeub-pometry committed Dec 20, 2024
1 parent d83105b commit d8519f8
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 74 deletions.
2 changes: 1 addition & 1 deletion raphtory/src/algorithms/components/in_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn in_component<'graph, G: GraphViewOps<'graph>>(
node.graph.clone(),
node.graph.clone(),
distances,
Some(Index::new(nodes, node.graph.unfiltered_num_nodes())),
Some(Index::new(nodes)),
)
}

Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/algorithms/components/out_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn out_component<'graph, G: GraphViewOps<'graph>>(
node.graph.clone(),
node.graph.clone(),
distances,
Some(Index::new(nodes, node.graph.unfiltered_num_nodes())),
Some(Index::new(nodes)),
)
}

Expand Down
5 changes: 1 addition & 4 deletions raphtory/src/db/api/state/lazy_node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ impl<'graph, Op: NodeOp + 'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'gra
self.nodes.base_graph.clone(),
self.nodes.graph.clone(),
values,
Some(Index::new(
keys,
self.nodes.base_graph.unfiltered_num_nodes(),
)),
Some(Index::new(keys)),
)
} else {
let values = self.collect_vec();
Expand Down
76 changes: 39 additions & 37 deletions raphtory/src/db/api/state/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,63 +7,66 @@ use crate::{
prelude::GraphViewOps,
};
use rayon::{iter::Either, prelude::*};
use roaring::RoaringTreemap;
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};

#[derive(Clone, Debug)]
pub struct Index<K> {
keys: Arc<[K]>,
map: Arc<[bool]>,
pub(crate) set: Arc<RoaringTreemap>,
len: usize, // cache the len for the set as it is recomputed every time
_phantom: PhantomData<K>,
}

impl<K: Copy + Into<usize>> Index<K> {
pub fn new(keys: impl Into<Arc<[K]>>, n: usize) -> Self {
let keys = keys.into();
let mut map = vec![false; n];
for k in keys.iter().copied() {
map[k.into()] = true;
}
impl<K: Copy + Eq + Hash + Into<usize> + From<usize> + Send + Sync> Index<K> {
pub fn new(keys: impl IntoIterator<Item = K>) -> Self {
let set: Arc<RoaringTreemap> =
Arc::new(keys.into_iter().map(|k| k.into() as u64).collect());
let len = set.len() as usize;
Self {
keys,
map: map.into(),
set,
len,
_phantom: PhantomData,
}
}
}

impl<K: Copy + Ord + Into<usize> + Send + Sync> Index<K> {
pub fn iter(&self) -> impl Iterator<Item = &K> + '_ {
self.keys.iter()
pub fn iter(&self) -> impl Iterator<Item = K> + '_ {
self.set.iter().map(|k| K::from(k as usize))
}

pub fn into_par_iter(self) -> impl IndexedParallelIterator<Item = K> {
let keys = self.keys;
(0..keys.len()).into_par_iter().map(move |i| keys[i])
(0..self.len())
.into_par_iter()
.map(move |i| self.key(i).unwrap())
}

pub fn into_iter(self) -> impl Iterator<Item = K> {
let keys = self.keys;
(0..keys.len()).map(move |i| keys[i])
(0..self.len()).map(move |i| self.key(i).unwrap())
}

pub fn index(&self, key: &K) -> Option<usize> {
self.keys.binary_search(key).ok()
let rank = self.set.rank((*key).into() as u64) as usize;
if rank < self.len() {
Some(rank)
} else {
None
}
}

pub fn key(&self, index: usize) -> Option<K> {
self.keys.get(index).copied()
self.set.select(index as u64).map(|k| K::from(k as usize))
}

pub fn len(&self) -> usize {
self.keys.len()
self.len
}

pub fn contains(&self, key: &K) -> bool {
self.map.get((*key).into()).copied().unwrap_or(false)
self.set.contains((*key).into() as u64)
}
}

impl<K: Copy + Hash + Eq + Send + Sync> Index<K> {
pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = &K> + '_ {
self.keys.par_iter()
pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = K> + '_ {
(0..self.len())
.into_par_iter()
.map(move |i| self.key(i).unwrap())
}
}

Expand Down Expand Up @@ -160,12 +163,11 @@ impl<
{
match &self.keys {
Some(index) => index
.keys
.iter()
.zip(self.values.iter())
.map(|(n, v)| {
(
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, *n),
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, n),
v,
)
})
Expand Down Expand Up @@ -199,14 +201,14 @@ impl<
'graph: 'a,
{
match &self.keys {
Some(index) => Either::Left(index.keys.par_iter().zip(self.values.par_iter()).map(
|(n, v)| {
Some(index) => {
Either::Left(index.par_iter().zip(self.values.par_iter()).map(|(n, v)| {
(
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, *n),
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, n),
v,
)
},
)),
}))
}
None => Either::Right(self.values.par_iter().enumerate().map(|(i, v)| {
(
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, VID(i)),
Expand All @@ -221,9 +223,9 @@ impl<
index: usize,
) -> Option<(NodeView<&Self::BaseGraph, &Self::Graph>, Self::Value<'_>)> {
match &self.keys {
Some(node_index) => node_index.keys.get(index).map(|n| {
Some(node_index) => node_index.key(index).map(|n| {
(
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, *n),
NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, n),
&self.values[index],
)
}),
Expand Down
6 changes: 3 additions & 3 deletions raphtory/src/db/api/state/node_state_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
Some(Index::new(keys)),
)
}
}
Expand All @@ -137,7 +137,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
Some(Index::new(keys)),
)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
Some(Index::new(keys)),
)
}

Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/api/storage/graph/storage_ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl GraphStorage {
let nodes_storage = self.nodes();
nodes
.par_iter()
.filter(|vid| view.filter_node(nodes_storage.node(**vid), layer_ids))
.filter(|&vid| view.filter_node(nodes_storage.node(vid), layer_ids))
.count()
}
}
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/api/view/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
.count(),
NodeList::List { nodes } => nodes
.par_iter()
.filter(move |&&id| self.filter_node(core_nodes.node_entry(id), layer_ids))
.filter(move |&id| self.filter_node(core_nodes.node_entry(id), layer_ids))
.count(),
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/api/view/internal/list_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl NodeList {
pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = VID> + '_ {
match self {
NodeList::All { num_nodes } => Either::Left((0..*num_nodes).into_par_iter().map(VID)),
NodeList::List { nodes } => Either::Right(nodes.par_iter().copied()),
NodeList::List { nodes } => Either::Right(nodes.par_iter()),
}
}

Expand All @@ -56,7 +56,7 @@ impl NodeList {
pub fn iter(&self) -> impl Iterator<Item = VID> + '_ {
match self {
NodeList::All { num_nodes } => Either::Left((0..*num_nodes).map(VID)),
NodeList::List { nodes } => Either::Right(nodes.iter().copied()),
NodeList::List { nodes } => Either::Right(nodes.iter()),
}
}

Expand Down
62 changes: 46 additions & 16 deletions raphtory/src/db/graph/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ use crate::{
prelude::*,
};

use crate::db::{api::state::NodeOp, graph::create_node_type_filter};
use crate::db::{
api::state::{Index, NodeOp},
graph::{create_node_type_filter, views::node_subgraph::NodeSubgraph},
};
use either::Either;
use rayon::iter::ParallelIterator;
use std::{
fmt::{Debug, Formatter},
Expand All @@ -26,6 +30,7 @@ use std::{
pub struct Nodes<'graph, G, GH = G> {
pub(crate) base_graph: G,
pub(crate) graph: GH,
pub(crate) nodes: Option<Index<VID>>,
pub(crate) node_types_filter: Option<Arc<[bool]>>,
_marker: PhantomData<&'graph ()>,
}
Expand All @@ -49,6 +54,7 @@ where
Nodes {
base_graph,
graph,
nodes: value.nodes,
node_types_filter: value.node_types_filter,
_marker: PhantomData,
}
Expand All @@ -64,6 +70,7 @@ where
Self {
base_graph,
graph,
nodes: None,
node_types_filter: None,
_marker: PhantomData,
}
Expand Down Expand Up @@ -92,10 +99,16 @@ where
G: GraphViewOps<'graph> + 'graph,
GH: GraphViewOps<'graph> + 'graph,
{
pub fn new_filtered(base_graph: G, graph: GH, node_types_filter: Option<Arc<[bool]>>) -> Self {
pub fn new_filtered(
base_graph: G,
graph: GH,
nodes: Option<Index<VID>>,
node_types_filter: Option<Arc<[bool]>>,
) -> Self {
Self {
base_graph,
graph,
nodes,
node_types_filter,
_marker: PhantomData,
}
Expand All @@ -104,19 +117,36 @@ where
pub(crate) fn par_iter_refs(&self) -> impl ParallelIterator<Item = VID> + 'graph {
let g = self.graph.core_graph().lock();
let node_types_filter = self.node_types_filter.clone();
g.into_nodes_par(self.graph.clone(), node_types_filter)
match self.nodes.clone() {
None => Either::Left(g.into_nodes_par(self.graph.clone(), node_types_filter)),
Some(nodes) => {
let gs = NodeSubgraph {
graph: self.graph.clone(),
nodes,
};
Either::Right(g.into_nodes_par(gs, node_types_filter))
}
}
}

#[inline]
pub(crate) fn iter_refs(&self) -> impl Iterator<Item = VID> + Send + Sync + 'graph {
let g = self.graph.core_graph().lock();
let node_types_filter = self.node_types_filter.clone();
g.into_nodes_iter(self.graph.clone(), node_types_filter)
match self.nodes.clone() {
None => g.into_nodes_iter(self.graph.clone(), node_types_filter),
Some(nodes) => {
let gs = NodeSubgraph {
graph: self.graph.clone(),
nodes,
};
g.into_nodes_iter(gs, node_types_filter)
}
}
}

pub fn iter(&self) -> impl Iterator<Item = NodeView<&G, &GH>> + '_ {
let cg = self.graph.core_graph().lock();
cg.into_nodes_iter(&self.graph, self.node_types_filter.clone())
pub fn iter(&self) -> impl Iterator<Item = NodeView<&G, &GH>> + use<'_, 'graph, G, GH> {
self.iter_refs()
.map(|v| NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, v))
}

Expand All @@ -128,19 +158,17 @@ where
.into_dyn_boxed()
}

pub fn par_iter(&self) -> impl ParallelIterator<Item = NodeView<&G, &GH>> + '_ {
let cg = self.graph.core_graph().lock();
let node_types_filter = self.node_types_filter.clone();
cg.into_nodes_par(&self.graph, node_types_filter)
pub fn par_iter(
&self,
) -> impl ParallelIterator<Item = NodeView<&G, &GH>> + use<'_, 'graph, G, GH> {
self.par_iter_refs()
.map(|v| NodeView::new_one_hop_filtered(&self.base_graph, &self.graph, v))
}

pub fn into_par_iter(self) -> impl ParallelIterator<Item = NodeView<G, GH>> + 'graph {
let cg = self.graph.core_graph().lock();
cg.into_nodes_par(self.graph.clone(), self.node_types_filter)
.map(move |n| {
NodeView::new_one_hop_filtered(self.base_graph.clone(), self.graph.clone(), n)
})
self.par_iter_refs().map(move |n| {
NodeView::new_one_hop_filtered(self.base_graph.clone(), self.graph.clone(), n)
})
}

/// Returns the number of nodes in the graph.
Expand Down Expand Up @@ -170,6 +198,7 @@ where
Nodes {
base_graph: self.base_graph.clone(),
graph: self.graph.clone(),
nodes: self.nodes.clone(),
node_types_filter,
_marker: PhantomData,
}
Expand Down Expand Up @@ -281,6 +310,7 @@ where
Nodes {
base_graph,
graph: filtered_graph,
nodes: self.nodes.clone(),
node_types_filter: self.node_types_filter.clone(),
_marker: PhantomData,
}
Expand Down
9 changes: 3 additions & 6 deletions raphtory/src/db/graph/views/node_subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ impl<'graph, G: GraphViewOps<'graph>> NodeSubgraph<G> {
let nodes = nodes
.into_iter()
.flat_map(|v| graph.internalise_node(v.as_node_ref()));
let mut nodes: Vec<_> = if graph.nodes_filtered() {
nodes.filter(|n| graph.has_node(*n)).collect()
let nodes = if graph.nodes_filtered() {
Index::new(nodes.filter(|n| graph.has_node(*n)))
} else {
nodes.collect()
Index::new(nodes)
};
nodes.sort();
nodes.dedup();
let nodes = Index::new(nodes, graph.unfiltered_num_nodes());
Self { graph, nodes }
}
}
Expand Down
Loading

0 comments on commit d8519f8

Please sign in to comment.