Skip to content

Commit

Permalink
improve subgraph count_nodes performance (#1869)
Browse files Browse the repository at this point in the history
* improve subgraph count_nodes performance

* clean up warnings

* try to optimise index for filtering
  • Loading branch information
ljeub-pometry authored Nov 26, 2024
1 parent 3fc1777 commit 4672baa
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 86 deletions.
13 changes: 4 additions & 9 deletions raphtory-graphql/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,30 +248,25 @@ impl Data {

#[cfg(test)]
pub(crate) mod data_tests {
use super::ValidGraphFolder;
use crate::{
config::app_config::{AppConfig, AppConfigBuilder},
data::Data,
};
use itertools::Itertools;
use raphtory::{core::utils::errors::GraphError, db::api::view::MaterializedGraph, prelude::*};
use std::{
collections::HashMap,
fs,
fs::File,
io,
path::{Path, PathBuf},
};
use std::{collections::HashMap, fs, fs::File, io, path::Path};

#[cfg(feature = "storage")]
use raphtory::{
db::api::storage::graph::storage_ops::GraphStorage, db::api::view::internal::CoreGraphOps,
disk_graph::DiskGraphStorage,
};
#[cfg(feature = "storage")]
use std::path::PathBuf;
#[cfg(feature = "storage")]
use std::{thread, time::Duration};

use super::ValidGraphFolder;

#[cfg(feature = "storage")]
fn copy_dir_recursive(source_dir: &Path, target_dir: &Path) -> Result<(), GraphError> {
fs::create_dir_all(target_dir)?;
Expand Down
6 changes: 3 additions & 3 deletions raphtory-graphql/src/model/graph/nodes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::model::graph::{node::Node, property::GqlPropValue, FilterCondition, Operator};
use dynamic_graphql::{Enum, InputObject, ResolvedObject, ResolvedObjectFields};
use crate::model::graph::{node::Node, FilterCondition, Operator};
use dynamic_graphql::{ResolvedObject, ResolvedObjectFields};
use raphtory::{
core::utils::errors::GraphError,
db::{api::view::DynamicGraph, graph::nodes::Nodes},
prelude::{GraphViewOps, *},
prelude::*,
};

#[derive(ResolvedObject)]
Expand Down
17 changes: 7 additions & 10 deletions raphtory/src/db/api/mutation/import_ops.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use raphtory_api::core::entities::{GID, VID};
use std::{borrow::Borrow, fmt::Debug};

use super::time_from_input;
use crate::{
core::{
entities::{
nodes::node_ref::{AsNodeRef, NodeRef},
LayerIds,
},
entities::{nodes::node_ref::AsNodeRef, LayerIds},
utils::errors::{
GraphError,
GraphError::{EdgeExistsError, NodeExistsError},
Expand All @@ -23,9 +18,11 @@ use crate::{
},
prelude::{AdditionOps, EdgeViewOps, GraphViewOps, NodeViewOps},
};
use raphtory_api::core::storage::{arc_str::OptionAsStr, timeindex::AsTime};

use super::time_from_input;
use raphtory_api::core::{
entities::GID,
storage::{arc_str::OptionAsStr, timeindex::AsTime},
};
use std::{borrow::Borrow, fmt::Debug};

pub trait ImportOps:
StaticGraphViewOps
Expand Down
7 changes: 5 additions & 2 deletions raphtory/src/db/api/state/lazy_node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
api::{
state::{
ops::{node::NodeOp, NodeOpFilter},
NodeState, NodeStateOps,
Index, NodeState, NodeStateOps,
},
view::{
internal::{NodeList, OneHopFilter},
Expand Down Expand Up @@ -97,7 +97,10 @@ impl<'graph, Op: NodeOp + 'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'gra
self.nodes.base_graph.clone(),
self.nodes.graph.clone(),
values,
Some(keys.into()),
Some(Index::new(
keys,
self.nodes.base_graph.unfiltered_num_nodes(),
)),
)
} else {
let values = self.collect_vec();
Expand Down
45 changes: 17 additions & 28 deletions raphtory/src/db/api/state/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,29 @@ use crate::{
prelude::GraphViewOps,
};
use rayon::{iter::Either, prelude::*};
use std::{
borrow::Borrow, collections::HashMap, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc,
};
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};

#[derive(Clone, Debug)]
pub struct Index<K> {
keys: Arc<[K]>,
map: Arc<HashMap<K, usize>>,
map: Arc<[bool]>,
}

impl<K: Copy + Hash + Eq> From<Vec<K>> for Index<K> {
fn from(keys: Vec<K>) -> Self {
let map = keys
.iter()
.copied()
.enumerate()
.map(|(i, k)| (k, i))
.collect();
impl<K: Copy + Into<usize>> Index<K> {
pub fn new(keys: impl Into<Arc<[K]>>, n: usize) -> Self {
let keys = keys.into();
let mut map = vec![false; n];
for k in keys.iter().copied() {
map[k.into()] = true;
}
Self {
keys: keys.into(),
map: Arc::new(map),
keys,
map: map.into(),
}
}
}

impl<K: Copy + Hash + Eq + Send + Sync> Index<K> {
impl<K: Copy + Ord + Into<usize> + Send + Sync> Index<K> {
pub fn iter(&self) -> impl Iterator<Item = &K> + '_ {
self.keys.iter()
}
Expand All @@ -47,12 +44,8 @@ impl<K: Copy + Hash + Eq + Send + Sync> Index<K> {
(0..keys.len()).map(move |i| keys[i])
}

pub fn index<Q: ?Sized>(&self, key: &Q) -> Option<usize>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.get(key).copied()
pub fn index(&self, key: &K) -> Option<usize> {
self.keys.binary_search(key).ok()
}

pub fn key(&self, index: usize) -> Option<K> {
Expand All @@ -63,12 +56,8 @@ impl<K: Copy + Hash + Eq + Send + Sync> Index<K> {
self.keys.len()
}

pub fn contains<Q: ?Sized>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.contains_key(key)
pub fn contains(&self, key: &K) -> bool {
self.map.get((*key).into()).copied().unwrap_or(false)
}
}

Expand Down Expand Up @@ -246,7 +235,7 @@ impl<
fn get_by_node<N: AsNodeRef>(&self, node: N) -> Option<Self::Value<'_>> {
let id = self.graph.internalise_node(node.as_node_ref())?;
match &self.keys {
Some(index) => index.map.get(&id).map(|i| &self.values[*i]),
Some(index) => index.index(&id).map(|i| &self.values[i]),
None => Some(&self.values[id.0]),
}
}
Expand Down
11 changes: 7 additions & 4 deletions raphtory/src/db/api/state/node_state_ops.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
core::entities::nodes::node_ref::AsNodeRef,
db::{
api::state::{node_state::NodeState, node_state_ord_ops, Index},
api::{
state::{node_state::NodeState, node_state_ord_ops, Index},
view::internal::CoreGraphOps,
},
graph::node::NodeView,
},
prelude::{GraphViewOps, NodeViewOps},
Expand Down Expand Up @@ -110,7 +113,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::from(keys)),
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
)
}
}
Expand All @@ -134,7 +137,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::from(keys)),
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
)
}

Expand Down Expand Up @@ -171,7 +174,7 @@ pub trait NodeStateOps<'graph>: IntoIterator<Item = Self::OwnedValue> {
self.base_graph().clone(),
self.graph().clone(),
values,
Some(Index::from(keys)),
Some(Index::new(keys, self.base_graph().unfiltered_num_nodes())),
)
}

Expand Down
3 changes: 1 addition & 2 deletions raphtory/src/db/api/view/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ impl<'graph, G: BoxableGraphView + Sized + Clone + 'graph> GraphViewOps<'graph>
.nodes()
.into_iter()
.filter(|node| !nodes_to_exclude.contains(&node.node))
.map(|node| node.node)
.collect();
.map(|node| node.node);

NodeSubgraph::new(self.clone(), nodes_to_include)
}
Expand Down
29 changes: 15 additions & 14 deletions raphtory/src/db/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ mod db_tests {
storage::arc_str::{ArcStr, OptionAsStr},
utils::logging::global_info_logger,
};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
#[cfg(feature = "proto")]
use tempfile::TempDir;
Expand Down Expand Up @@ -681,7 +680,7 @@ mod db_tests {
assert_eq!(gg.edges().len(), 2);

let gg = Graph::new();
let res = gg.add_edge(1, "C", "D", NO_PROPS, None);
gg.add_edge(1, "C", "D", NO_PROPS, None).unwrap();
let res = gg.import_edges(vec![&e_a_b, &e_c_d], false);
match res {
Err(GraphError::EdgesExistError(duplicates)) => {
Expand Down Expand Up @@ -823,11 +822,12 @@ mod db_tests {
#[test]
fn import_edge_as() {
let g = Graph::new();
let g_a = g.add_node(0, "A", NO_PROPS, None).unwrap();
g.add_node(0, "A", NO_PROPS, None).unwrap();
let g_b = g
.add_node(1, "B", vec![("temp".to_string(), Prop::Bool(true))], None)
.unwrap();
let _ = g_b.add_constant_properties(vec![("con".to_string(), Prop::I64(11))]);
g_b.add_constant_properties(vec![("con".to_string(), Prop::I64(11))])
.unwrap();
let e_a_b = g
.add_edge(
2,
Expand All @@ -848,8 +848,8 @@ mod db_tests {
.unwrap();

let gg = Graph::new();
let e = gg.add_edge(1, "X", "Y", NO_PROPS, None).unwrap();
let res = gg.import_edge_as(&e_b_c, ("Y", "Z"), false);
gg.add_edge(1, "X", "Y", NO_PROPS, None).unwrap();
gg.import_edge_as(&e_b_c, ("Y", "Z"), false).unwrap();
let res = gg.import_edge_as(&e_a_b, ("X", "Y"), false);
match res {
Err(EdgeExistsError(src_id, dst_id)) => {
Expand Down Expand Up @@ -883,7 +883,7 @@ mod db_tests {
#[test]
fn import_edge_as_merge() {
let g = Graph::new();
let g_a = g.add_node(0, "A", NO_PROPS, None).unwrap();
g.add_node(0, "A", NO_PROPS, None).unwrap();
let g_b = g
.add_node(1, "B", vec![("temp".to_string(), Prop::Bool(true))], None)
.unwrap();
Expand Down Expand Up @@ -920,12 +920,13 @@ mod db_tests {
#[test]
fn import_edges_as() {
let g = Graph::new();
let g_a = g.add_node(0, "A", NO_PROPS, None).unwrap();
g.add_node(0, "A", NO_PROPS, None).unwrap();
let g_b = g
.add_node(1, "B", vec![("temp".to_string(), Prop::Bool(true))], None)
.unwrap();
let _ = g_b.add_constant_properties(vec![("con".to_string(), Prop::I64(11))]);
let g_c = g.add_node(0, "C", NO_PROPS, None).unwrap();
g_b.add_constant_properties(vec![("con".to_string(), Prop::I64(11))])
.unwrap();
g.add_node(0, "C", NO_PROPS, None).unwrap();
let e_a_b = g
.add_edge(
2,
Expand All @@ -938,7 +939,7 @@ mod db_tests {
let e_b_c = g.add_edge(2, "B", "C", NO_PROPS, None).unwrap();

let gg = Graph::new();
let e = gg.add_edge(1, "Y", "Z", NO_PROPS, None).unwrap();
gg.add_edge(1, "Y", "Z", NO_PROPS, None).unwrap();
let res = gg.import_edges_as([&e_a_b, &e_b_c], [("X", "Y"), ("Y", "Z")], false);
match res {
Err(GraphError::EdgesExistError(duplicates)) => {
Expand Down Expand Up @@ -980,7 +981,7 @@ mod db_tests {
#[test]
fn import_edges_as_merge() {
let g = Graph::new();
let g_a = g.add_node(0, "A", NO_PROPS, None).unwrap();
g.add_node(0, "A", NO_PROPS, None).unwrap();
let g_b = g
.add_node(1, "B", vec![("temp".to_string(), Prop::Bool(true))], None)
.unwrap();
Expand All @@ -996,8 +997,8 @@ mod db_tests {
.unwrap();

let gg = Graph::new();
let _ = gg.add_edge(3, "X", "Y", NO_PROPS, None).unwrap();
let res = gg.import_edges_as([&e_a_b], [("X", "Y")], true).unwrap();
gg.add_edge(3, "X", "Y", NO_PROPS, None).unwrap();
gg.import_edges_as([&e_a_b], [("X", "Y")], true).unwrap();

let e_x_y = gg.edge("X", "Y").unwrap();
assert_eq!(
Expand Down
Loading

0 comments on commit 4672baa

Please sign in to comment.