Skip to content

Commit

Permalink
Various improvements for disk graph (#1866)
Browse files Browse the repository at this point in the history
* move LayerAdditions to disk and LayerIds to api

* disable pom-storage

* fix edge_frame_builder progress bar

* fix the python node properties tests

* add masked graph

* just collect in parallel to vec when masking

* add test to verify masked graph equals graph view

* remove some useless checks

* disable progress bars in graph loading from default

* fix some issues from review

* rename to cache_view

* changes as per review
  • Loading branch information
fabianmurariu authored Nov 28, 2024
1 parent f37eca6 commit c596410
Show file tree
Hide file tree
Showing 23 changed files with 664 additions and 298 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ numpy = "0.22.1"
itertools = "0.13.0"
rand = "0.8.5"
rayon = "1.8.1"
roaring = "0.10.6"
sorted_vector_map = "0.2.0"
tokio = { version = "1.36.0", features = ["full"] }
once_cell = "1.19.0"
Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
182 changes: 182 additions & 0 deletions raphtory-api/src/core/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::input::input_node::parse_u64_strict;
use bytemuck::{Pod, Zeroable};
use edges::edge_ref::EdgeRef;
use num_traits::ToPrimitive;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
fmt::{Display, Formatter},
sync::Arc,
};

pub mod edges;
Expand Down Expand Up @@ -311,3 +314,182 @@ impl<'a> GidRef<'a> {
}
}
}

#[derive(Clone, Debug)]
pub enum LayerIds {
None,
All,
One(usize),
Multiple(Multiple),
}

#[derive(Clone, Debug, Default)]
pub struct Multiple(pub Arc<[usize]>);

impl Multiple {
#[inline]
pub fn binary_search(&self, pos: &usize) -> Option<usize> {
self.0.binary_search(pos).ok()
}

#[inline]
pub fn into_iter(&self) -> impl Iterator<Item = usize> {
let ids = self.0.clone();
(0..ids.len()).map(move |i| ids[i])
}

#[inline]
pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
self.0.iter().copied()
}

#[inline]
pub fn find(&self, id: usize) -> Option<usize> {
self.0.get(id).copied()
}

#[inline]
pub fn par_iter(&self) -> impl rayon::iter::ParallelIterator<Item = usize> {
let bit_vec = self.0.clone();
(0..bit_vec.len()).into_par_iter().map(move |i| bit_vec[i])
}

#[inline]
pub fn len(&self) -> usize {
self.0.len()
}
}

impl FromIterator<usize> for Multiple {
fn from_iter<I: IntoIterator<Item = usize>>(iter: I) -> Self {
Multiple(iter.into_iter().collect())
}
}

impl From<Vec<usize>> for Multiple {
fn from(v: Vec<usize>) -> Self {
v.into_iter().collect()
}
}

#[cfg(test)]
mod test {
use crate::core::entities::Multiple;

#[test]
fn empty_bit_multiple() {
let bm = super::Multiple::default();
let actual = bm.into_iter().collect::<Vec<_>>();
let expected: Vec<usize> = vec![];
assert_eq!(actual, expected);
}

#[test]
fn set_one() {
let bm: Multiple = [1].into_iter().collect();
let actual = bm.into_iter().collect::<Vec<_>>();
assert_eq!(actual, vec![1usize]);
}

#[test]
fn set_two() {
let bm: Multiple = [1, 67].into_iter().collect();

let actual = bm.into_iter().collect::<Vec<_>>();
assert_eq!(actual, vec![1usize, 67]);
}
}

impl LayerIds {
pub fn find(&self, layer_id: usize) -> Option<usize> {
match self {
LayerIds::All => Some(layer_id),
LayerIds::One(id) => {
if *id == layer_id {
Some(layer_id)
} else {
None
}
}
LayerIds::Multiple(ids) => ids.binary_search(&layer_id).map(|_| layer_id),
LayerIds::None => None,
}
}

pub fn intersect(&self, other: &LayerIds) -> LayerIds {
match (self, other) {
(LayerIds::None, _) => LayerIds::None,
(_, LayerIds::None) => LayerIds::None,
(LayerIds::All, other) => other.clone(),
(this, LayerIds::All) => this.clone(),
(LayerIds::One(id), other) => {
if other.contains(id) {
LayerIds::One(*id)
} else {
LayerIds::None
}
}
(LayerIds::Multiple(ids), other) => {
let ids: Vec<usize> = ids.iter().filter(|id| other.contains(id)).collect();
match ids.len() {
0 => LayerIds::None,
1 => LayerIds::One(ids[0]),
_ => LayerIds::Multiple(ids.into()),
}
}
}
}

pub fn constrain_from_edge(&self, e: EdgeRef) -> Cow<LayerIds> {
match e.layer() {
None => Cow::Borrowed(self),
Some(l) => self
.find(l)
.map(|id| Cow::Owned(LayerIds::One(id)))
.unwrap_or(Cow::Owned(LayerIds::None)),
}
}

pub fn contains(&self, layer_id: &usize) -> bool {
self.find(*layer_id).is_some()
}

pub fn is_none(&self) -> bool {
matches!(self, LayerIds::None)
}
}

impl From<Vec<usize>> for LayerIds {
fn from(mut v: Vec<usize>) -> Self {
match v.len() {
0 => LayerIds::All,
1 => LayerIds::One(v[0]),
_ => {
v.sort_unstable();
v.dedup();
LayerIds::Multiple(v.into())
}
}
}
}

impl<const N: usize> From<[usize; N]> for LayerIds {
fn from(v: [usize; N]) -> Self {
match v.len() {
0 => LayerIds::All,
1 => LayerIds::One(v[0]),
_ => {
let mut v = v.to_vec();
v.sort_unstable();
v.dedup();
LayerIds::Multiple(v.into())
}
}
}
}

impl From<usize> for LayerIds {
fn from(id: usize) -> Self {
LayerIds::One(id)
}
}
8 changes: 8 additions & 0 deletions raphtory-api/src/core/utils/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ pub fn global_debug_logger() {
pub fn global_trace_logger() {
init_global_logger("TRACE".to_string())
}

pub fn sysout_debug() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.with_span_events(FmtSpan::ENTER | FmtSpan::CLOSE)
.init();
}
1 change: 1 addition & 0 deletions raphtory-cypher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow.workspace = true
arrow-buffer.workspace = true
arrow-schema.workspace = true
arrow-array.workspace = true
tracing-subscriber.workspace = true

pest.workspace = true
pest_derive.workspace = true
Expand Down
6 changes: 3 additions & 3 deletions raphtory-cypher/examples/raphtory_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod cypher {
use futures::{stream, StreamExt};
use raphtory::{
disk_graph::{graph_impl::ParquetLayerCols, DiskGraphStorage},
logging::global_info_logger,
logging::{global_info_logger, sysout_debug},
};
use raphtory_cypher::{run_cypher, run_cypher_to_streams, run_sql};
use serde::{de::DeserializeOwned, Deserialize};
Expand Down Expand Up @@ -126,11 +126,11 @@ mod cypher {

// #[tokio::main]
pub async fn main() {
global_info_logger();
let args = Args::parse();

match args {
Args::Query(args) => {
global_info_logger();
let graph =
DiskGraphStorage::load_from_dir(&args.graph_dir).expect("Failed to load graph");

Expand All @@ -145,7 +145,6 @@ mod cypher {

let now = std::time::Instant::now();
let batches = df.collect().await.unwrap();
global_info_logger();
info!("Query execution time: {:?}", now.elapsed());
print_batches(&batches).expect("Failed to print batches");
} else {
Expand All @@ -161,6 +160,7 @@ mod cypher {
}

Args::Load(args) => {
sysout_debug();
let layers = args.layers;
let layer_parquet_cols = (0..layers.len())
.map(|layer_id| {
Expand Down
1 change: 1 addition & 0 deletions raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pometry-storage = { workspace = true, optional = true }

prost = { workspace = true, optional = true }
prost-types = { workspace = true, optional = true }
roaring ={ workspace = true }

[dev-dependencies]
csv = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions raphtory/src/algorithms/community_detection/louvain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ mod test {
test_storage,
};
use proptest::prelude::*;
#[cfg(feature = "io")]
use tracing::info;

#[cfg(feature = "io")]
Expand Down
Loading

0 comments on commit c596410

Please sign in to comment.