Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/dfview #1712

Merged
merged 12 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions python/tests/test_disk_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from raphtory import PyDirection, DiskGraphStorage
from raphtory import DiskGraphStorage
from raphtory import algorithms
import pandas as pd
import tempfile
Expand Down Expand Up @@ -35,17 +35,13 @@
).sort_values(["src", "dst", "time"])


def create_graph(edges, dir):
return DiskGraphStorage.load_from_pandas(dir, edges, "src", "dst", "time")


# in every test use with to create a temporary directory that will be deleted automatically
# after the with block ends


def test_counts():
dir = tempfile.TemporaryDirectory()
graph = create_graph(edges, dir.name).to_events()
graph_dir = tempfile.TemporaryDirectory()
graph = DiskGraphStorage.load_from_pandas(graph_dir.name, edges, "src", "dst", "time")
graph = graph.to_events()
assert graph.count_nodes() == 5
assert graph.count_edges() == 20

Expand Down Expand Up @@ -140,6 +136,7 @@ def test_disk_graph():
)
assert len(list(actual.get_all_with_names())) == 1624


def test_disk_graph_type_filter():
curr_dir = os.path.dirname(os.path.abspath(__file__))
rsc_dir = os.path.join(curr_dir, "..", "..", "pometry-storage-private", "resources")
Expand Down
2 changes: 2 additions & 0 deletions raphtory-graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ base64-compat = { workspace = true }
time = { workspace = true }
reqwest = { workspace = true }
moka = { workspace = true }
kdam = { workspace = true}


# python binding optional dependencies
pyo3 = { workspace = true, optional = true }
Expand Down
1 change: 0 additions & 1 deletion raphtory-graphql/src/model/graph/graphs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::data::get_graph_name;
use async_graphql::parser::Error;
use dynamic_graphql::{ResolvedObject, ResolvedObjectFields};
use itertools::Itertools;
use std::path::PathBuf;
Expand Down
6 changes: 3 additions & 3 deletions raphtory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ quad-rand = { workspace = true }
serde_json = { workspace = true }
ouroboros = { workspace = true }
either = { workspace = true }
kdam = { workspace = true}


# io optional dependencies
csv = { workspace = true, optional = true }
Expand All @@ -66,7 +68,6 @@ display-error-chain = { workspace = true, optional = true }
polars-arrow = { workspace = true, optional = true }
polars-parquet = { workspace = true, optional = true }
polars-utils = { workspace = true, optional = true }
kdam = { workspace = true, optional = true }
memmap2 = { workspace = true, optional = true }
ahash = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
Expand Down Expand Up @@ -125,8 +126,7 @@ python = [
"polars-arrow?/compute",
"raphtory-api/python",
"dep:rpds",
"dep:kdam",
"kdam?/notebook"
"kdam/notebook"
]

# storage
Expand Down
8 changes: 8 additions & 0 deletions raphtory/src/core/utils/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::core::{utils::time::error::ParseTimeError, Prop, PropType};
#[cfg(feature = "arrow")]
use polars_arrow::legacy::error;
#[cfg(feature = "python")]
use pyo3::PyErr;
use raphtory_api::core::{entities::GID, storage::arc_str::ArcStr};
use std::path::PathBuf;
#[cfg(feature = "search")]
Expand Down Expand Up @@ -185,6 +187,12 @@ pub enum GraphError {

#[error("Immutable graph is .. immutable!")]
AttemptToMutateImmutableGraph,

#[cfg(feature = "python")]
#[error("Python error occurred: {0}")]
PythonError(#[from] PyErr),
#[error("An error with Tdqm occurred")]
TqdmError,
}

impl GraphError {
Expand Down
108 changes: 52 additions & 56 deletions raphtory/src/io/arrow/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ use polars_arrow::{

use itertools::Itertools;

#[derive(Debug)]
pub(crate) struct DFView {
pub(crate) names: Vec<String>,
pub(crate) arrays: Vec<Vec<Box<dyn Array>>>,
pub(crate) struct DFView<I> {
pub names: Vec<String>,
pub(crate) chunks: I,
pub num_rows: usize,
}

impl DFView {
pub(crate) fn get_inner_size(&self) -> usize {
if self.arrays.is_empty() || self.arrays[0].is_empty() {
return 0;
impl<I, E> DFView<I>
where
I: Iterator<Item = Result<DFChunk, E>>,
{
pub(crate) fn new(names: Vec<String>, chunks: I, num_rows: usize) -> Self {
Self {
names,
chunks,
num_rows,
}
self.arrays[0][0].len()
}

pub fn check_cols_exist(&self, cols: &[&str]) -> Result<(), GraphError> {
Expand All @@ -36,66 +40,58 @@ impl DFView {
Ok(())
}

pub(crate) fn get_index(&self, name: &str) -> Result<usize, GraphError> {
self.names
.iter()
.position(|n| n == name)
.ok_or_else(|| GraphError::ColumnDoesNotExist(name.to_string()))
}
}

#[derive(Clone)]
pub(crate) struct DFChunk {
pub(crate) chunk: Vec<Box<dyn Array>>,
}

impl DFChunk {
pub(crate) fn iter_col<T: NativeType>(
&self,
name: &str,
idx: usize,
) -> Option<impl Iterator<Item = Option<&T>> + '_> {
let idx = self.names.iter().position(|n| n == name)?;

let _ = (&self.arrays[0])[idx]
let col_arr = (&self.chunk)[idx]
.as_any()
.downcast_ref::<PrimitiveArray<T>>()?;

let iter = self.arrays.iter().flat_map(move |arr| {
let arr = &arr[idx];
let arr = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
arr.iter()
});

Some(iter)
Some(col_arr.iter())
}

pub fn utf8<O: Offset>(&self, name: &str) -> Option<impl Iterator<Item = Option<&str>> + '_> {
let idx = self.names.iter().position(|n| n == name)?;
pub fn utf8<O: Offset>(&self, idx: usize) -> Option<impl Iterator<Item = Option<&str>> + '_> {
// test that it's actually a utf8 array
let _ = (&self.arrays[0])[idx]
.as_any()
.downcast_ref::<Utf8Array<O>>()?;

let iter = self.arrays.iter().flat_map(move |arr| {
let arr = &arr[idx];
let arr = arr.as_any().downcast_ref::<Utf8Array<O>>().unwrap();
arr.iter()
});
let col_arr = (&self.chunk)[idx].as_any().downcast_ref::<Utf8Array<O>>()?;

Some(iter)
Some(col_arr.iter())
}

pub fn time_iter_col(&self, name: &str) -> Option<impl Iterator<Item = Option<i64>> + '_> {
let idx = self.names.iter().position(|n| n == name)?;

let _ = (&self.arrays[0])[idx]
pub fn time_iter_col(&self, idx: usize) -> Option<impl Iterator<Item = Option<i64>> + '_> {
let col_arr = (&self.chunk)[idx]
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()?;

let iter = self.arrays.iter().flat_map(move |arr| {
let arr = &arr[idx];
let arr = if let DataType::Timestamp(_, _) = arr.data_type() {
let array = cast::cast(
&*arr.clone(),
&DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())),
CastOptions::default(),
)
.unwrap();
array
} else {
arr.clone()
};

let arr = arr.as_any().downcast_ref::<PrimitiveArray<i64>>().unwrap();
arr.clone().into_iter()
});

Some(iter)
let arr = if let DataType::Timestamp(_, _) = col_arr.data_type() {
let array = cast::cast(
col_arr,
&DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())),
CastOptions::default(),
)
.unwrap();
array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
.unwrap()
.clone()
} else {
col_arr.clone()
};

Some(arr.into_iter())
}
}
Loading
Loading