diff --git a/Cargo.lock b/Cargo.lock index bd9dc4cfbb..b8704669aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3988,6 +3988,7 @@ dependencies = [ "quickcheck_macros", "rand 0.8.5", "rand_distr", + "raphtory-api", "raphtory-arrow", "rayon", "regex", @@ -4008,9 +4009,42 @@ dependencies = [ "zip", ] +[[package]] +name = "raphtory-api" +version = "0.8.1" +dependencies = [ + "chrono", + "serde", +] + [[package]] name = "raphtory-arrow" version = "0.8.1" +dependencies = [ + "ahash", + "bincode", + "bytemuck", + "itertools 0.12.1", + "memmap2", + "num-traits", + "once_cell", + "parking_lot", + "polars-arrow", + "polars-parquet", + "polars-utils", + "proptest", + "raphtory-api", + "rayon", + "serde", + "serde_json", + "strum 0.26.2", + "tempfile", + "thiserror", + "tracing", + "tracing-subscriber", + "tracing-test", + "twox-hash", +] [[package]] name = "raphtory-benchmark" @@ -5483,6 +5517,29 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a2c0ff408fe918a94c428a3f2ad04e4afd5c95bbc08fcf868eff750c15728a4" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bc1c4f8e2e73a977812ab339d503e6feeb92700f6d07a6de4d321522d5c08" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.109", +] + [[package]] name = "triomphe" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index b0c51900e5..dd3ba83662 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "examples/netflow", "python", "js-raphtory", - "raphtory-graphql", + "raphtory-graphql", "raphtory-api", ] default-members = ["raphtory"] resolver = "2" diff --git a/raphtory-api/Cargo.toml b/raphtory-api/Cargo.toml new file mode 100644 index 0000000000..799cec954f --- /dev/null +++ b/raphtory-api/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "raphtory-api" +version.workspace = true +documentation.workspace = true +repository.workspace = true +license.workspace = true +readme.workspace = true +homepage.workspace = true +keywords.workspace = true +authors.workspace = true +rust-version.workspace = true +edition.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { workspace = true, features = ["derive"] } +chrono.workspace = true \ No newline at end of file diff --git a/raphtory/src/core/entities/edges/edge_ref.rs b/raphtory-api/src/core/entities/edges/edge_ref.rs similarity index 100% rename from raphtory/src/core/entities/edges/edge_ref.rs rename to raphtory-api/src/core/entities/edges/edge_ref.rs diff --git a/raphtory-api/src/core/entities/edges/mod.rs b/raphtory-api/src/core/entities/edges/mod.rs new file mode 100644 index 0000000000..e51063bf4d --- /dev/null +++ b/raphtory-api/src/core/entities/edges/mod.rs @@ -0,0 +1 @@ +pub mod edge_ref; diff --git a/raphtory-api/src/core/entities/mod.rs b/raphtory-api/src/core/entities/mod.rs new file mode 100644 index 0000000000..02cf442c00 --- /dev/null +++ b/raphtory-api/src/core/entities/mod.rs @@ -0,0 +1,87 @@ +use serde::{Deserialize, Serialize}; + +use self::edges::edge_ref::EdgeRef; + +pub mod edges; + +// the only reason this is public is because the physical ids of the nodes don't move +#[repr(transparent)] +#[derive( + Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, +)] +pub struct VID(pub usize); + +impl VID { + pub fn index(&self) -> usize { + self.0 + } + + pub fn as_u64(&self) -> u64 { + self.0 as u64 + } +} + +impl From for VID { + fn from(id: usize) -> Self { + VID(id) + } +} + +impl From for usize { + fn from(id: VID) -> Self { + id.0 + } +} + +#[repr(transparent)] +#[derive( + Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, +)] +pub struct EID(pub usize); + +impl From for usize { + fn from(id: EID) -> Self { + id.0 + } +} + +impl From for EID { + fn from(id: usize) -> Self { + EID(id) + } +} + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, +)] +pub struct ELID { + edge: EID, + layer: Option, +} + +impl ELID { + pub fn new(edge: EID, layer: Option) -> Self { + Self { edge, layer } + } + pub fn pid(&self) -> EID { + self.edge + } + + pub fn layer(&self) -> Option { + self.layer + } +} + +impl From for ELID { + fn from(value: EdgeRef) -> Self { + ELID { + edge: value.pid(), + layer: value.layer().copied(), + } + } +} +impl EID { + pub fn from_u64(id: u64) -> Self { + EID(id as usize) + } +} diff --git a/raphtory-api/src/core/mod.rs b/raphtory-api/src/core/mod.rs new file mode 100644 index 0000000000..c8a39e619a --- /dev/null +++ b/raphtory-api/src/core/mod.rs @@ -0,0 +1,22 @@ +pub mod entities; +pub mod storage; + +/// Denotes the direction of an edge. Can be incoming, outgoing or both. +#[derive( + Clone, + Copy, + Hash, + Eq, + PartialEq, + PartialOrd, + Debug, + Default, + serde::Serialize, + serde::Deserialize, +)] +pub enum Direction { + OUT, + IN, + #[default] + BOTH, +} diff --git a/raphtory-api/src/core/storage/mod.rs b/raphtory-api/src/core/storage/mod.rs new file mode 100644 index 0000000000..5309fd0959 --- /dev/null +++ b/raphtory-api/src/core/storage/mod.rs @@ -0,0 +1 @@ +pub mod timeindex; diff --git a/raphtory-api/src/core/storage/timeindex.rs b/raphtory-api/src/core/storage/timeindex.rs new file mode 100644 index 0000000000..e83bc96fed --- /dev/null +++ b/raphtory-api/src/core/storage/timeindex.rs @@ -0,0 +1,78 @@ +use std::{fmt, ops::Range}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)] +pub struct TimeIndexEntry(pub i64, pub usize); + +pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static { + fn t(&self) -> i64; + + fn dt(&self) -> Option> { + let t = self.t(); + DateTime::from_timestamp_millis(t) + } + + fn range(w: Range) -> Range; + + fn i(&self) -> usize { + 0 + } + + fn new(t: i64, s: usize) -> Self; +} + +impl From for TimeIndexEntry { + fn from(value: i64) -> Self { + Self::start(value) + } +} + +impl TimeIndexEntry { + pub const MIN: TimeIndexEntry = TimeIndexEntry(i64::MIN, 0); + + pub const MAX: TimeIndexEntry = TimeIndexEntry(i64::MAX, usize::MAX); + pub fn new(t: i64, s: usize) -> Self { + Self(t, s) + } + + pub fn start(t: i64) -> Self { + Self(t, 0) + } + + pub fn end(t: i64) -> Self { + Self(t.saturating_add(1), 0) + } +} + +impl AsTime for i64 { + fn t(&self) -> i64 { + *self + } + + fn range(w: Range) -> Range { + w + } + + fn new(t: i64, _s: usize) -> Self { + t + } +} + +impl AsTime for TimeIndexEntry { + fn t(&self) -> i64 { + self.0 + } + fn range(w: Range) -> Range { + Self::start(w.start)..Self::start(w.end) + } + + fn i(&self) -> usize { + self.1 + } + + fn new(t: i64, s: usize) -> Self { + Self(t, s) + } +} diff --git a/raphtory-api/src/lib.rs b/raphtory-api/src/lib.rs new file mode 100644 index 0000000000..5a7ca06a4f --- /dev/null +++ b/raphtory-api/src/lib.rs @@ -0,0 +1 @@ +pub mod core; diff --git a/raphtory-arrow/Cargo.toml b/raphtory-arrow/Cargo.toml index 6eec84f04a..36635f234f 100644 --- a/raphtory-arrow/Cargo.toml +++ b/raphtory-arrow/Cargo.toml @@ -1,4 +1,46 @@ [package] name = "raphtory-arrow" version = "0.8.1" +documentation = "https://raphtory.readthedocs.io/en/latest/" +repository = "https://github.com/Raphtory/raphtory-arrow/" +license = "GPL-3.0" +readme = "README.md" +homepage = "https://github.com/Raphtory/raphtory-arrow/" +keywords = ["graph", "temporal-graph", "temporal"] +authors = ["Pometry"] +rust-version = "1.77" +edition = "2021" + +[profile.release-with-debug] +inherits = "release" +debug = true + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + [dependencies] +raphtory-api = { path = "../raphtory-api", version = "0.8.1" } +ahash = { version = "0.8", features = ["serde"] } +bincode = "1.3.3" +bytemuck = "1.16.0" +itertools = "0.12.1" +memmap2 = "0.9.4" +num-traits = "0.2.19" +once_cell = "1.19.0" +parking_lot = "0.12.2" +polars-arrow = "0.39.2" +polars-parquet = { version = "0.39.2", features = ["compression"] } +polars-utils = "0.39.2" +rayon = "1.10.0" +serde = "1.0.201" +serde_json = "1.0.117" +strum = { version = "0.26.2", features = ["derive"] } +tempfile = "3.10.1" +thiserror = "1.0.60" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +twox-hash = "1.6.3" + +[dev-dependencies] +proptest = "1.4.0" +tracing-test = "0.2.4" diff --git a/raphtory-arrow/src/lib.rs b/raphtory-arrow/src/lib.rs index 8b13789179..415877292b 100644 --- a/raphtory-arrow/src/lib.rs +++ b/raphtory-arrow/src/lib.rs @@ -1 +1,472 @@ +use std::{ + borrow::Cow, + num::TryFromIntError, + ops::Range, + path::{Path, PathBuf}, +}; +use crate::arrow2::{ + array::{Array, StructArray}, + compute::concatenate::concatenate, + datatypes::{ArrowDataType as DataType, ArrowSchema as Schema, Field}, +}; +use itertools::Itertools; +use num_traits::ToPrimitive; +use polars_arrow::record_batch::RecordBatch; +use serde::{Deserialize, Serialize}; + +use crate::{ + arrow2::legacy::error, + load::parquet_reader::{NumRows, TrySlice}, +}; + +pub mod algorithms; +pub mod arrow_hmap; +pub mod chunked_array; +pub mod edge; +pub mod edges; +pub mod global_order; +pub mod graph; +pub mod graph_builder; +pub mod graph_fragment; +pub mod interop; +pub mod load; +pub mod nodes; +pub mod properties; +pub mod timestamps; +pub mod tprops; + +mod compute; + +pub type Time = i64; + +pub mod prelude { + pub use super::chunked_array::array_ops::*; +} + +#[derive(thiserror::Error, Debug)] +pub enum RAError { + #[error("Failed to memory map file {file:?}, source: {source}")] + MMap { + file: PathBuf, + source: error::PolarsError, + }, + #[error("Arrow error: {0}")] + Arrow(#[from] error::PolarsError), + #[error("IO error: {0}")] + IO(#[from] std::io::Error), + //serde error + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Bad data type for node column: {0:?}")] + DType(DataType), + #[error("Graph directory is not empty before loading")] + GraphDirNotEmpty, + #[error("Invalid type for column: {0}")] + InvalidTypeColumn(String), + #[error("Column not found: {0}")] + ColumnNotFound(String), + #[error("No Edge lists found in input path")] + NoEdgeLists, + #[error("Unable to open graph: {0:?}")] + EmptyGraphDir(PathBuf), + #[error("Empty parquet chunk")] + EmptyChunk, + #[error("Conversion error: {0}")] + ArgumentError(#[from] TryFromIntError), + #[error("Invalid file: {0:?}")] + InvalidFile(PathBuf), + #[error("Invalid metadata: {0:?}")] + MetadataError(#[from] Box), + #[error("Failed to cast mmap_mut to [i64]: {0:?}")] + SliceCastError(bytemuck::PodCastError), + #[error("Failed to cast array")] + TypeCastError, + #[error("Missing chunk {0}")] + MissingChunk(usize), +} + +const TIME_COLUMN: &str = "rap_time"; +const TIME_COLUMN_IDX: usize = 0; + +pub(crate) const V_COLUMN: &str = "v"; +pub(crate) const E_COLUMN: &str = "e"; + +#[inline] +pub fn adj_schema() -> DataType { + DataType::Struct(vec![ + Field::new(V_COLUMN, DataType::UInt64, false), + Field::new(E_COLUMN, DataType::UInt64, false), + ]) +} + +pub(crate) mod file_prefix { + use std::{ + path::{Path, PathBuf}, + str::FromStr, + }; + + use itertools::Itertools; + use strum::{AsRefStr, EnumString}; + + use super::RAError; + + #[derive(AsRefStr, EnumString, PartialEq, Debug, Ord, PartialOrd, Eq, Copy, Clone)] + pub enum GraphPaths { + NodeAdditions, + NodeAdditionsOffsets, + NodeTProps, + NodeTPropsTimestamps, + NodeTPropsSecondaryIndex, + NodeTPropsOffsets, + NodeConstProps, + AdjOutSrcs, + AdjOutDsts, + AdjOutOffsets, + EdgeTPropsOffsets, + EdgeTProps, + AdjInSrcs, + AdjInEdges, + AdjInOffsets, + Metadata, + HashMap, + } + + #[derive(Debug, PartialEq, Ord, PartialOrd, Eq, Copy, Clone)] + pub struct GraphFile { + pub prefix: GraphPaths, + pub chunk: usize, + } + + impl GraphFile { + pub fn try_from_path(path: impl AsRef) -> Option { + let name = path.as_ref().file_stem()?.to_str()?; + let mut name_parts = name.split('-'); + let prefix = GraphPaths::from_str(name_parts.next()?).ok()?; + let chunk_str = name_parts.next(); + let chunk: usize = chunk_str?.parse().ok()?; + Some(Self { prefix, chunk }) + } + } + + impl GraphPaths { + pub fn try_from(path: impl AsRef) -> Option { + let path = path.as_ref(); + let name = path.file_name().and_then(|name| name.to_str())?; + let prefix = name.split('-').next()?; + GraphPaths::from_str(prefix).ok() + } + + pub fn to_path(&self, location_path: impl AsRef, id: usize) -> PathBuf { + let prefix: &str = self.as_ref(); + make_path(location_path, prefix, id) + } + } + + pub fn make_path(location_path: impl AsRef, prefix: &str, id: usize) -> PathBuf { + let file_path = location_path + .as_ref() + .join(format!("{}-{:08}.ipc", prefix, id)); + file_path + } + + pub fn sorted_file_list( + dir: impl AsRef, + prefix: GraphPaths, + ) -> Result, RAError> { + let mut files = dir + .as_ref() + .read_dir()? + .filter_map_ok(|f| { + let path = f.path(); + GraphFile::try_from_path(&path) + .filter(|f| f.prefix == prefix) + .map(|f| (f.chunk, path)) + }) + .collect::, _>>()?; + files.sort(); + for (i, (chunk, _)) in files.iter().enumerate() { + if &i != chunk { + return Err(RAError::MissingChunk(i)); + } + } + Ok(files.into_iter().map(|(_, path)| path)) + } +} + +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] +pub enum GID { + U64(u64), + I64(i64), + Str(String), +} + +impl GID { + pub fn into_str(self) -> Option { + match self { + GID::Str(v) => Some(v), + _ => None, + } + } + + pub fn into_i64(self) -> Option { + match self { + GID::I64(v) => Some(v), + _ => None, + } + } + + pub fn into_u64(self) -> Option { + match self { + GID::U64(v) => Some(v), + _ => None, + } + } + + pub fn as_str(&self) -> Option<&str> { + match self { + GID::Str(v) => Some(v.as_str()), + _ => None, + } + } + + pub fn as_i64(&self) -> Option { + match self { + GID::I64(v) => Some(*v), + _ => None, + } + } + + pub fn as_u64(&self) -> Option { + match self { + GID::U64(v) => Some(*v), + _ => None, + } + } + + pub fn to_str(&self) -> Cow { + match self { + GID::U64(v) => Cow::Owned(v.to_string()), + GID::I64(v) => Cow::Owned(v.to_string()), + GID::Str(v) => Cow::Borrowed(v), + } + } + + pub fn to_i64(&self) -> Option { + match self { + GID::U64(v) => v.to_i64(), + GID::I64(v) => Some(*v), + GID::Str(v) => parse_u64_strict(v)?.to_i64(), + } + } + + pub fn to_u64(&self) -> Option { + match self { + GID::U64(v) => Some(*v), + GID::I64(v) => v.to_u64(), + GID::Str(v) => parse_u64_strict(v), + } + } +} + +const MAX_U64_BYTES: [u8; 20] = [ + 49, 56, 52, 52, 54, 55, 52, 52, 48, 55, 51, 55, 48, 57, 53, 53, 49, 54, 49, 53, +]; + +pub fn parse_u64_strict(input: &str) -> Option { + if input.len() > 20 { + // a u64 string has at most 20 bytes + return None; + } + let byte_0 = b'0'; + let byte_1 = b'1'; + let byte_9 = b'9'; + let mut input_iter = input.bytes(); + let first = input_iter.next()?; + if first == byte_0 { + return input_iter.next().is_none().then_some(0); + } + if input.len() == 20 && (byte_1..=MAX_U64_BYTES[0]).contains(&first) { + let mut result = (first - byte_0) as u64; + for (next_byte, max_byte) in input_iter.zip(MAX_U64_BYTES[1..].iter().copied()) { + if !(byte_0..=max_byte).contains(&next_byte) { + return None; + } + result = result * 10 + (next_byte - byte_0) as u64; + } + return Some(result); + } + if (byte_1..=byte_9).contains(&first) { + let mut result = (first - byte_0) as u64; + for next_byte in input_iter { + if !(byte_0..=byte_9).contains(&next_byte) { + return None; + } + result = result * 10 + (next_byte - byte_0) as u64; + } + return Some(result); + } + + None +} + +impl From for GID { + fn from(id: u64) -> Self { + Self::U64(id) + } +} + +impl From for GID { + fn from(id: i64) -> Self { + Self::I64(id) + } +} + +impl From for GID { + fn from(id: String) -> Self { + Self::Str(id) + } +} + +impl From<&str> for GID { + fn from(id: &str) -> Self { + Self::Str(id.to_string()) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct GraphChunk { + srcs: Box, + dsts: Box, +} + +impl GraphChunk { + pub fn to_chunk(&self) -> RecordBatch> { + RecordBatch::new(vec![self.srcs.clone(), self.dsts.clone()]) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct PropsChunk(pub StructArray); + +impl NumRows for &PropsChunk { + fn num_rows(&self) -> usize { + self.0.len() + } +} + +impl TrySlice for &PropsChunk { + fn try_slice(&self, range: Range) -> Result { + self.0.try_slice(range) + } +} + +pub fn concat(arrays: Vec) -> Result { + let mut refs: Vec<&dyn Array> = Vec::with_capacity(arrays.len()); + for array in arrays.iter() { + refs.push(array); + } + Ok(concatenate(&refs)? + .as_any() + .downcast_ref::() + .unwrap() + .clone()) +} + +pub(crate) fn split_struct_chunk( + chunk: StructArray, + src_col_idx: usize, + dst_col_idx: usize, + time_col_idx: usize, +) -> (GraphChunk, PropsChunk) { + let (fields, cols, _) = chunk.into_data(); + split_chunk( + cols.to_vec(), + src_col_idx, + dst_col_idx, + time_col_idx, + fields.into(), + ) +} + +pub(crate) fn split_chunk>>( + columns_in_chunk: I, + src_col_idx: usize, + dst_col_idx: usize, + time_col_idx: usize, + chunk_schema: Schema, +) -> (GraphChunk, PropsChunk) { + let all_cols = columns_in_chunk.into_iter().collect_vec(); + + let time_d_type = all_cols[time_col_idx].data_type().clone(); + assert_eq!(time_d_type, DataType::Int64, "time column must be i64"); + let first_len = all_cols.first().unwrap().len(); + if all_cols.iter().any(|arr| arr.len() != first_len) { + panic!("All arrays in a chunk must have the same length"); + } + + let mut temporal_props = vec![all_cols[time_col_idx].clone()]; + for (i, column) in all_cols.iter().enumerate() { + if !(i == src_col_idx || i == dst_col_idx || i == time_col_idx) { + temporal_props.push(column.clone()); + } + } + + let mut props_only_schema = + chunk_schema.filter(|i, _| !(i == src_col_idx || i == dst_col_idx || i == time_col_idx)); + // put time as the first column in the struct + props_only_schema + .fields + .insert(0, Field::new(TIME_COLUMN, time_d_type, false)); + let data_type = DataType::Struct(props_only_schema.fields); + let t_prop_cols = StructArray::new(data_type, temporal_props, None); + + ( + GraphChunk { + srcs: all_cols[src_col_idx].clone(), + dsts: all_cols[dst_col_idx].clone(), + // time: all_cols[time_col_idx].clone(), + }, + PropsChunk(t_prop_cols), + ) +} + +fn prepare_graph_dir>(graph_dir: P) -> Result<(), RAError> { + // create graph dir if it does not exist + // if it exists make sure it's empty + std::fs::create_dir_all(&graph_dir)?; + + let mut dir_iter = std::fs::read_dir(&graph_dir)?; + if dir_iter.next().is_some() { + return Err(RAError::GraphDirNotEmpty); + } + + Ok(()) +} +pub mod utils { + + use std::hash::{Hash, Hasher}; + use twox_hash::XxHash64; + + use crate::GID; + + pub fn calculate_hash(t: &T) -> u64 { + let mut s = XxHash64::default(); + t.hash(&mut s); + s.finish() + } + + pub fn calculate_hash_spark(gid: &GID) -> i64 { + let mut s = XxHash64::with_seed(42); + match gid { + GID::U64(x) => s.write_u64(*x), + GID::I64(x) => s.write_i64(*x), + GID::Str(t) => { + t.chars().for_each(|c| s.write_u8(c as u8)); + } + } + s.finish() as i64 + } +} + +pub use polars_arrow as arrow2; diff --git a/raphtory-cypher/src/executor/table_provider/node.rs b/raphtory-cypher/src/executor/table_provider/node.rs index d7520bd521..e0d9deefb4 100644 --- a/raphtory-cypher/src/executor/table_provider/node.rs +++ b/raphtory-cypher/src/executor/table_provider/node.rs @@ -21,7 +21,10 @@ use datafusion::{ use futures::Stream; use raphtory_arrow::properties::Properties; -use raphtory::arrow::{graph_impl::ArrowGraph, prelude::*}; +use raphtory::{ + arrow::{graph_impl::ArrowGraph, prelude::*}, + core::entities::VID, +}; use crate::{ arrow2::{self, array::to_data, datatypes::ArrowDataType}, @@ -64,7 +67,7 @@ impl NodeTableProvider { pub fn lift_arrow_schema( gid_dt: ArrowDataType, - properties: Option<&Properties>, + properties: Option<&Properties>, ) -> Result { let mut fields = vec![]; diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index aacd181300..cf61f0fe7b 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -15,6 +15,7 @@ homepage.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +raphtory-api = { path = "../raphtory-api", version="0.8.1" } bincode = { workspace = true } chrono = { workspace = true } itertools = { workspace = true } @@ -88,7 +89,7 @@ streaming-stats = { workspace = true } proptest = { workspace = true } [features] -default = [] +default = ["arrow"] # Enables the graph loader io module io = [ "dep:zip", diff --git a/raphtory/src/arrow/graph_impl/interop.rs b/raphtory/src/arrow/graph_impl/interop.rs index db0787753f..9dd184815a 100644 --- a/raphtory/src/arrow/graph_impl/interop.rs +++ b/raphtory/src/arrow/graph_impl/interop.rs @@ -16,7 +16,8 @@ use crate::{ }; use itertools::Itertools; use polars_arrow::array::Array; -use raphtory_arrow::interop::{AsEID, AsVID, GraphLike, EID, VID}; +use raphtory_api::core::entities::{EID, VID}; +use raphtory_arrow::interop::GraphLike; impl GraphLike for Graph { fn external_ids(&self) -> Vec { @@ -44,24 +45,24 @@ impl GraphLike for Graph { self.count_edges() } - fn out_degree(&self, vid: impl AsVID, layer: usize) -> usize { - self.core_node_entry(vid.as_vid().0.into()) + fn out_degree(&self, vid: VID, layer: usize) -> usize { + self.core_node_entry(vid.0.into()) .degree(&LayerIds::One(layer), Direction::OUT) } - fn in_degree(&self, vid: impl AsVID, layer: usize) -> usize { - self.core_node_entry(vid.as_vid().0.into()) + fn in_degree(&self, vid: VID, layer: usize) -> usize { + self.core_node_entry(vid.0.into()) .degree(&LayerIds::One(layer), Direction::IN) } - fn in_edges(&self, vid: impl AsVID, layer: usize, map: impl Fn(VID, EID) -> B) -> Vec { - let node = self.core_node_entry(vid.as_vid().0.into()); + fn in_edges(&self, vid: VID, layer: usize, map: impl Fn(VID, EID) -> B) -> Vec { + let node = self.core_node_entry(vid.0.into()); node.edges_iter(&LayerIds::One(layer), Direction::IN) .map(|edge| map(edge.src().into(), edge.pid().into())) .collect() } - fn out_edges(&self, vid: impl AsVID, layer: usize) -> Vec<(VID, VID, EID)> { - let node = self.core_node_entry(vid.as_vid().0.into()); + fn out_edges(&self, vid: VID, layer: usize) -> Vec<(VID, VID, EID)> { + let node = self.core_node_entry(vid.0.into()); let edges = node .edges_iter(&LayerIds::One(layer), Direction::OUT) .map(|edge| { @@ -74,8 +75,8 @@ impl GraphLike for Graph { edges } - fn edge_additions(&self, eid: impl AsEID, layer: usize) -> Vec { - let el_id = ELID::new(eid.as_eid().0.into(), Some(layer)); + fn edge_additions(&self, eid: EID, layer: usize) -> Vec { + let el_id = ELID::new(eid.0.into(), Some(layer)); let edge = self.core_edge(el_id); let timestamps: Vec<_> = edge.additions(layer).iter().collect(); timestamps @@ -86,8 +87,8 @@ impl GraphLike for Graph { props.into_iter().map(|s| s.to_string()).collect() } - fn find_name(&self, vid: impl AsVID) -> Option { - self.core_node_entry(vid.as_vid().0.into()) + fn find_name(&self, vid: VID) -> Option { + self.core_node_entry(vid.0.into()) .name() .map(|s| s.to_string()) } diff --git a/raphtory/src/arrow/graph_impl/prop_conversion.rs b/raphtory/src/arrow/graph_impl/prop_conversion.rs index 648e559e24..376062933a 100644 --- a/raphtory/src/arrow/graph_impl/prop_conversion.rs +++ b/raphtory/src/arrow/graph_impl/prop_conversion.rs @@ -20,7 +20,7 @@ use std::path::Path; pub fn make_node_properties_from_graph( graph: &Graph, graph_dir: impl AsRef, -) -> Result>, RAError> { +) -> Result>, RAError> { let graph_dir = graph_dir.as_ref(); let n = graph.unfiltered_num_nodes(); @@ -65,7 +65,7 @@ pub fn make_node_properties_from_graph( let prop_type = temporal_meta.get_dtype(prop_id).unwrap(); let col = arrow_array_from_props( (0..n).flat_map(|vid| { - let ts = node_ts(raphtory_arrow::interop::VID(vid), offsets, ts); + let ts = node_ts(VID(vid), offsets, ts); let node = nodes.get(VID(vid)); ts.iter() .map(move |t| node.temporal_property(prop_id).and_then(|prop| prop.at(t))) diff --git a/raphtory/src/arrow/mod.rs b/raphtory/src/arrow/mod.rs index 0f94c46246..be37356346 100644 --- a/raphtory/src/arrow/mod.rs +++ b/raphtory/src/arrow/mod.rs @@ -29,12 +29,11 @@ mod test { datatypes::Field, }; use proptest::{prelude::*, sample::size_range}; - use raphtory_arrow::{ - global_order::GlobalMap, - graph_fragment::TempColGraphFragment, - interop::{Direction, EID, VID}, - RAError, + use raphtory_api::core::{ + entities::{EID, VID}, + Direction, }; + use raphtory_arrow::{global_order::GlobalMap, graph_fragment::TempColGraphFragment, RAError}; use tempfile::TempDir; fn edges_sanity_node_list(edges: &[(u64, u64, i64)]) -> Vec { @@ -367,7 +366,7 @@ mod test { mod addition_bounds { use itertools::Itertools; use proptest::{prelude::*, sample::size_range}; - use raphtory_arrow::interop::VID; + use raphtory_api::core::entities::VID; use tempfile::TempDir; use super::{ diff --git a/raphtory/src/arrow/storage_interface/node.rs b/raphtory/src/arrow/storage_interface/node.rs index 0e87effb4c..b41c01ea69 100644 --- a/raphtory/src/arrow/storage_interface/node.rs +++ b/raphtory/src/arrow/storage_interface/node.rs @@ -22,7 +22,7 @@ use std::{iter, sync::Arc}; #[derive(Copy, Clone, Debug)] pub struct ArrowNode<'a> { - pub(super) properties: Option<&'a Properties>, + pub(super) properties: Option<&'a Properties>, pub(super) layers: &'a Arc<[TempColGraphFragment]>, pub(super) vid: VID, } @@ -292,7 +292,7 @@ impl<'a> NodeStorageOps<'a> for ArrowNode<'a> { #[derive(Clone, Debug)] pub struct ArrowOwnedNode { - properties: Option>, + properties: Option>, layers: Arc<[TempColGraphFragment]>, vid: VID, } diff --git a/raphtory/src/arrow/storage_interface/nodes.rs b/raphtory/src/arrow/storage_interface/nodes.rs index 111f96b145..bb894416a9 100644 --- a/raphtory/src/arrow/storage_interface/nodes.rs +++ b/raphtory/src/arrow/storage_interface/nodes.rs @@ -11,7 +11,7 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct ArrowNodesOwned { num_nodes: usize, - properties: Option>, + properties: Option>, layers: Arc<[TempColGraphFragment]>, } diff --git a/raphtory/src/arrow/storage_interface/nodes_ref.rs b/raphtory/src/arrow/storage_interface/nodes_ref.rs index bb1c7dce86..6a21f6e7b9 100644 --- a/raphtory/src/arrow/storage_interface/nodes_ref.rs +++ b/raphtory/src/arrow/storage_interface/nodes_ref.rs @@ -8,7 +8,7 @@ use std::sync::Arc; #[derive(Copy, Clone, Debug)] pub struct ArrowNodesRef<'a> { pub(super) num_nodes: usize, - pub(super) properties: Option<&'a Properties>, + pub(super) properties: Option<&'a Properties>, pub(super) layers: &'a Arc<[TempColGraphFragment]>, } diff --git a/raphtory/src/core/entities/edges/edge_store.rs b/raphtory/src/core/entities/edges/edge_store.rs index 98c51d8a3a..a00988a275 100644 --- a/raphtory/src/core/entities/edges/edge_store.rs +++ b/raphtory/src/core/entities/edges/edge_store.rs @@ -1,7 +1,6 @@ use crate::{ core::{ entities::{ - edges::edge_ref::EdgeRef, properties::{props::Props, tprop::TProp}, LayerIds, EID, VID, }, @@ -18,13 +17,17 @@ use crate::{ view::{BoxedLIter, IntoDynBoxed}, }, }; + +use raphtory_api::core::entities::edges::edge_ref::EdgeRef; +pub use raphtory_api::core::entities::edges::*; + use itertools::{EitherOrBoth, Itertools}; use ouroboros::self_referencing; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::{ iter, - ops::{Deref, DerefMut, Range}, + ops::{DerefMut, Range}, }; #[derive(Serialize, Deserialize, Debug, Default, PartialEq)] @@ -87,13 +90,11 @@ impl EdgeLayer { } } -impl> From for EdgeRef { - fn from(val: E) -> Self { - EdgeRef::new_outgoing(val.eid, val.src, val.dst) +impl EdgeStore { + pub fn as_edge_ref(&self) -> EdgeRef { + EdgeRef::new_outgoing(self.eid, self.src, self.dst) } -} -impl EdgeStore { pub fn internal_num_layers(&self) -> usize { self.layers .len() diff --git a/raphtory/src/core/entities/edges/mod.rs b/raphtory/src/core/entities/edges/mod.rs index 7c83a0c4b3..d1f7224234 100644 --- a/raphtory/src/core/entities/edges/mod.rs +++ b/raphtory/src/core/entities/edges/mod.rs @@ -1,2 +1,3 @@ -pub mod edge_ref; pub mod edge_store; + +pub use raphtory_api::core::entities::edges::*; diff --git a/raphtory/src/core/entities/mod.rs b/raphtory/src/core/entities/mod.rs index 8e3c21a7be..1deb0e08cc 100644 --- a/raphtory/src/core/entities/mod.rs +++ b/raphtory/src/core/entities/mod.rs @@ -1,148 +1,13 @@ use std::sync::Arc; -#[cfg(feature = "arrow")] -use raphtory_arrow::interop::{AsEID, AsVID}; -use serde::{Deserialize, Serialize}; - -use crate::core::entities::edges::edge_ref::EdgeRef; +use raphtory_api::core::entities::edges::edge_ref::EdgeRef; pub mod edges; pub mod graph; pub mod nodes; pub mod properties; -// the only reason this is public is because the physical ids of the nodes don't move -#[repr(transparent)] -#[derive( - Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, -)] -pub struct VID(pub usize); - -impl VID { - pub fn index(&self) -> usize { - self.0 - } - - pub fn as_u64(&self) -> u64 { - self.0 as u64 - } -} - -impl From for VID { - fn from(id: usize) -> Self { - VID(id) - } -} - -#[cfg(feature = "arrow")] -impl From for VID { - fn from(vid: raphtory_arrow::interop::VID) -> Self { - VID(vid.0) - } -} - -impl From for usize { - fn from(id: VID) -> Self { - id.0 - } -} - -#[cfg(feature = "arrow")] -impl AsVID for VID { - fn as_vid(&self) -> raphtory_arrow::interop::VID { - raphtory_arrow::interop::VID::from(self.0) - } -} - -#[cfg(feature = "arrow")] -impl PartialEq for raphtory_arrow::interop::VID { - fn eq(&self, other: &VID) -> bool { - self.0 == other.0 - } -} - -#[cfg(feature = "arrow")] -impl Into for VID { - #[inline] - fn into(self) -> raphtory_arrow::interop::VID { - raphtory_arrow::interop::VID(self.0) - } -} - -#[repr(transparent)] -#[derive( - Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, -)] -pub struct EID(pub usize); - -#[cfg(feature = "arrow")] -impl From for EID { - fn from(eid: raphtory_arrow::interop::EID) -> Self { - EID(eid.0) - } -} - -#[cfg(feature = "arrow")] -impl Into for EID { - #[inline] - fn into(self) -> raphtory_arrow::interop::EID { - raphtory_arrow::interop::EID(self.0) - } -} - -#[cfg(feature = "arrow")] -impl AsEID for EID { - fn as_eid(&self) -> raphtory_arrow::interop::EID { - raphtory_arrow::interop::EID(self.0) - } -} - -#[derive( - Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, Serialize, Default, -)] -pub struct ELID { - edge: EID, - layer: Option, -} - -impl ELID { - pub fn new(edge: EID, layer: Option) -> Self { - Self { edge, layer } - } - pub fn pid(&self) -> EID { - self.edge - } - - pub fn layer(&self) -> Option { - self.layer - } -} - -impl From for ELID { - fn from(value: EdgeRef) -> Self { - ELID { - edge: value.pid(), - layer: value.layer().copied(), - } - } -} -impl EID { - pub fn from_u64(id: u64) -> Self { - EID(id as usize) - } -} - -impl From for usize { - fn from(id: EID) -> Self { - id.0 - } -} - -impl From for EID { - fn from(id: usize) -> Self { - EID(id) - } -} +pub use raphtory_api::core::entities::*; #[derive(Clone, Debug)] pub enum LayerIds { diff --git a/raphtory/src/core/mod.rs b/raphtory/src/core/mod.rs index c41d40d3cd..b2615f4a1b 100644 --- a/raphtory/src/core/mod.rs +++ b/raphtory/src/core/mod.rs @@ -29,8 +29,6 @@ use crate::{ prelude::GraphViewOps, }; use chrono::{DateTime, NaiveDateTime, Utc}; -#[cfg(feature = "arrow")] -use raphtory_arrow::interop::AsDir; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::{ @@ -136,36 +134,7 @@ impl<'a, O: AsRef + 'a> OptionAsStr<'a> for Option<&'a O> { } } -/// Denotes the direction of an edge. Can be incoming, outgoing or both. -#[derive( - Clone, - Copy, - Hash, - Eq, - PartialEq, - PartialOrd, - Debug, - Default, - serde::Serialize, - serde::Deserialize, -)] -pub enum Direction { - OUT, - IN, - #[default] - BOTH, -} - -#[cfg(feature = "arrow")] -impl AsDir for Direction { - fn as_dir(&self) -> raphtory_arrow::interop::Direction { - match self { - Direction::OUT => raphtory_arrow::interop::Direction::OUT, - Direction::IN => raphtory_arrow::interop::Direction::IN, - Direction::BOTH => raphtory_arrow::interop::Direction::BOTH, - } - } -} +pub use raphtory_api::core::*; #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] pub enum Lifespan { diff --git a/raphtory/src/core/storage/timeindex.rs b/raphtory/src/core/storage/timeindex.rs index ac80b386fa..006bd844f6 100644 --- a/raphtory/src/core/storage/timeindex.rs +++ b/raphtory/src/core/storage/timeindex.rs @@ -17,91 +17,7 @@ use std::{ ops::{Deref, Range}, }; -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)] -pub struct TimeIndexEntry(pub i64, pub usize); - -pub trait AsTime: Debug + Copy + Ord + Eq + Send + Sync + 'static { - fn t(&self) -> i64; - - fn dt(&self) -> Option> { - let t = self.t(); - DateTime::from_timestamp_millis(t) - } - - fn range(w: Range) -> Range; -} - -#[cfg(feature = "arrow")] -impl raphtory_arrow::interop::AsTime for TimeIndexEntry { - fn t(&self) -> i64 { - self.0 - } - - fn range(w: Range) -> Range { - Self::start(w.start)..Self::start(w.end) - } - - fn new(t: i64, s: usize) -> Self { - Self(t, s) - } - - fn i(&self) -> usize { - self.1 - } -} - -impl From for TimeIndexEntry { - fn from(value: i64) -> Self { - Self::start(value) - } -} - -impl TimeIndexEntry { - pub const MIN: TimeIndexEntry = TimeIndexEntry(i64::MIN, 0); - - pub const MAX: TimeIndexEntry = TimeIndexEntry(i64::MAX, usize::MAX); - pub fn new(t: i64, s: usize) -> Self { - Self(t, s) - } - - pub fn from_input( - g: &G, - t: T, - ) -> Result { - let t = t.try_into_input_time()?; - Ok(match t { - InputTime::Simple(t) => Self::new(t, g.next_event_id()), - InputTime::Indexed(t, s) => Self::new(t, s), - }) - } - - pub fn start(t: i64) -> Self { - Self(t, 0) - } - - pub fn end(t: i64) -> Self { - Self(t.saturating_add(1), 0) - } -} - -impl AsTime for i64 { - fn t(&self) -> i64 { - *self - } - - fn range(w: Range) -> Range { - w - } -} - -impl AsTime for TimeIndexEntry { - fn t(&self) -> i64 { - self.0 - } - fn range(w: Range) -> Range { - Self::start(w.start)..Self::start(w.end) - } -} +pub use raphtory_api::core::storage::timeindex::*; #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum TimeIndex { @@ -375,6 +291,7 @@ pub trait TimeIndexOps: Send + Sync { Self: 'a; fn active(&self, w: Range) -> bool; + fn active_t(&self, w: Range) -> bool { self.active(Self::IndexType::range(w)) } diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index ef57e15f70..d1a50e619f 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -1,7 +1,6 @@ use crate::{ core::{ entities::{edges::edge_ref::EdgeRef, nodes::input_node::InputNode}, - storage::timeindex::TimeIndexEntry, utils::{errors::GraphError, time::IntoTimeWithFormat}, Prop, }, @@ -14,6 +13,8 @@ use crate::{ }, }; +use super::time_from_input; + pub trait AdditionOps: StaticGraphViewOps { // TODO: Probably add vector reference here like add /// Add a node to the graph @@ -112,7 +113,7 @@ impl AdditionOps for G { |name, dtype| self.resolve_node_property(name, dtype, false), |prop| self.process_prop_value(prop), )?; - let ti = TimeIndexEntry::from_input(self, t)?; + let ti = time_from_input(self, t)?; let v_id = self.resolve_node(v.id(), v.id_str()); let type_id = self.resolve_node_type(v_id, node_type)?; self.internal_add_node(ti, v_id, properties, type_id)?; @@ -127,7 +128,7 @@ impl AdditionOps for G { props: PI, layer: Option<&str>, ) -> Result, GraphError> { - let ti = TimeIndexEntry::from_input(self, t)?; + let ti = time_from_input(self, t)?; let src_id = self.resolve_node(src.id(), src.id_str()); let dst_id = self.resolve_node(dst.id(), dst.id_str()); let layer_id = self.resolve_layer(layer); diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index 085e95576c..9ab844abeb 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -1,7 +1,6 @@ use crate::{ core::{ entities::nodes::input_node::InputNode, - storage::timeindex::TimeIndexEntry, utils::{errors::GraphError, time::IntoTimeWithFormat}, }, db::api::mutation::{ @@ -10,6 +9,8 @@ use crate::{ }, }; +use super::time_from_input; + pub trait DeletionOps: InternalDeletionOps + InternalAdditionOps + Sized { fn delete_edge( &self, @@ -18,7 +19,7 @@ pub trait DeletionOps: InternalDeletionOps + InternalAdditionOps + Sized { dst: V, layer: Option<&str>, ) -> Result<(), GraphError> { - let ti = TimeIndexEntry::from_input(self, t)?; + let ti = time_from_input(self, t)?; let src_id = self.resolve_node(src.id(), src.id_str()); let dst_id = self.resolve_node(dst.id(), src.id_str()); let layer = self.resolve_layer(layer); diff --git a/raphtory/src/db/api/mutation/import_ops.rs b/raphtory/src/db/api/mutation/import_ops.rs index a1a4bb896c..8ad52c0070 100644 --- a/raphtory/src/db/api/mutation/import_ops.rs +++ b/raphtory/src/db/api/mutation/import_ops.rs @@ -1,7 +1,6 @@ use crate::{ core::{ entities::LayerIds, - storage::timeindex::TimeIndexEntry, utils::errors::{ GraphError, GraphError::{EdgeExistsError, NodeExistsError}, @@ -21,6 +20,8 @@ use crate::{ prelude::{AdditionOps, EdgeViewOps, NodeViewOps}, }; +use super::time_from_input; + pub trait ImportOps: StaticGraphViewOps + InternalAdditionOps @@ -133,7 +134,7 @@ impl< .unwrap_or(0usize); for h in node.history() { - let t = TimeIndexEntry::from_input(self, h)?; + let t = time_from_input(self, h)?; self.internal_add_node(t, node_internal, vec![], node_internal_type_id)?; } for (name, prop_view) in node.properties().temporal().iter() { @@ -152,7 +153,7 @@ impl< let new_prop_id = self.resolve_node_property(&name, dtype, false)?; for (h, prop) in prop_view.iter() { let new_prop = self.process_prop_value(prop); - let t = TimeIndexEntry::from_input(self, h)?; + let t = time_from_input(self, h)?; self.internal_add_node( t, node_internal, @@ -217,7 +218,7 @@ impl< if self.include_deletions() { for t in edge.graph.edge_deletion_history(edge.edge, &layer_ids) { - let ti = TimeIndexEntry::from_input(self, t)?; + let ti = time_from_input(self, t)?; let src_id = self.resolve_node(edge.src().id(), Some(&edge.src().name())); let dst_id = self.resolve_node(edge.dst().id(), Some(&edge.dst().name())); let layer = self.resolve_layer(layer_name); diff --git a/raphtory/src/db/api/mutation/mod.rs b/raphtory/src/db/api/mutation/mod.rs index 9d552dc998..e5494453e7 100644 --- a/raphtory/src/db/api/mutation/mod.rs +++ b/raphtory/src/db/api/mutation/mod.rs @@ -19,6 +19,9 @@ pub use addition_ops::AdditionOps; pub use deletion_ops::DeletionOps; pub use import_ops::ImportOps; pub use property_addition_ops::PropertyAdditionOps; +use raphtory_api::core::storage::timeindex::TimeIndexEntry; + +use self::internal::InternalAdditionOps; /// Used to handle automatic injection of secondary index if not explicitly provided pub enum InputTime { @@ -26,6 +29,17 @@ pub enum InputTime { Indexed(i64, usize), } +pub fn time_from_input( + g: &G, + t: T, +) -> Result { + let t = t.try_into_input_time()?; + Ok(match t { + InputTime::Simple(t) => TimeIndexEntry::new(t, g.next_event_id()), + InputTime::Indexed(t, s) => TimeIndexEntry::new(t, s), + }) +} + pub trait TryIntoInputTime { fn try_into_input_time(self) -> Result; } diff --git a/raphtory/src/db/api/mutation/property_addition_ops.rs b/raphtory/src/db/api/mutation/property_addition_ops.rs index c3d9905ec1..9c26585270 100644 --- a/raphtory/src/db/api/mutation/property_addition_ops.rs +++ b/raphtory/src/db/api/mutation/property_addition_ops.rs @@ -9,7 +9,7 @@ use crate::{ }, }; -use super::CollectProperties; +use super::{time_from_input, CollectProperties}; pub trait PropertyAdditionOps { fn add_properties( @@ -31,7 +31,7 @@ impl PropertyAdditionOps f t: T, props: PI, ) -> Result<(), GraphError> { - let ti = TimeIndexEntry::from_input(self, t)?; + let ti = time_from_input(self, t)?; let properties: Vec<_> = props.collect_properties( |name, _| Ok(self.resolve_graph_property(name, false)), |prop| self.process_prop_value(prop), diff --git a/raphtory/src/db/api/storage/storage_ops.rs b/raphtory/src/db/api/storage/storage_ops.rs index 6ef7e45ad0..10d136afe8 100644 --- a/raphtory/src/db/api/storage/storage_ops.rs +++ b/raphtory/src/db/api/storage/storage_ops.rs @@ -252,7 +252,7 @@ impl GraphStorage { let iter = (0..edges.len()).map(EID); let filtered = match view.filter_state() { FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| EdgeRef::from(edges.get(eid)))) + FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) } FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { let e = EdgeStorageRef::Mem(edges.get(e)); @@ -378,7 +378,7 @@ impl GraphStorage { let iter = (0..edges.len()).into_par_iter().map(EID); let filtered = match view.filter_state() { FilterState::Neither => { - FilterVariants::Neither(iter.map(move |eid| EdgeRef::from(edges.get(eid)))) + FilterVariants::Neither(iter.map(move |eid| edges.get(eid).as_edge_ref())) } FilterState::Both => FilterVariants::Both(iter.filter_map(move |e| { let e = EdgeStorageRef::Mem(edges.get(e)); diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 05def8578c..e5ae56d652 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -18,7 +18,7 @@ use crate::{ api::{ mutation::{ internal::{InternalAdditionOps, InternalDeletionOps, InternalPropertyAdditionOps}, - CollectProperties, TryIntoInputTime, + time_from_input, CollectProperties, TryIntoInputTime, }, properties::{ internal::{ConstPropertiesOps, TemporalPropertiesOps, TemporalPropertyViewOps}, @@ -85,7 +85,7 @@ impl< > EdgeView { pub fn delete(&self, t: T, layer: Option<&str>) -> Result<(), GraphError> { - let t = TimeIndexEntry::from_input(&self.graph, t)?; + let t = time_from_input(&self.graph, t)?; let layer = self.resolve_layer(layer, true)?; self.graph .internal_delete_edge(t, self.edge.src(), self.edge.dst(), layer) @@ -249,7 +249,7 @@ impl props: C, layer: Option<&str>, ) -> Result<(), GraphError> { - let t = TimeIndexEntry::from_input(&self.graph, time)?; + let t = time_from_input(&self.graph, time)?; let layer_id = self.resolve_layer(layer, true)?; let properties: Vec<(usize, Prop)> = props.collect_properties( |name, dtype| self.graph.resolve_edge_property(name, dtype, false), diff --git a/raphtory/src/db/graph/node.rs b/raphtory/src/db/graph/node.rs index 9dbf4f23e6..d12e59edc7 100644 --- a/raphtory/src/db/graph/node.rs +++ b/raphtory/src/db/graph/node.rs @@ -11,7 +11,7 @@ use crate::{ api::{ mutation::{ internal::{InternalAdditionOps, InternalPropertyAdditionOps}, - CollectProperties, TryIntoInputTime, + time_from_input, CollectProperties, TryIntoInputTime, }, properties::{ internal::{ConstPropertiesOps, TemporalPropertiesOps, TemporalPropertyViewOps}, @@ -358,7 +358,7 @@ impl time: T, props: C, ) -> Result<(), GraphError> { - let t = TimeIndexEntry::from_input(&self.graph, time)?; + let t = time_from_input(&self.graph, time)?; let properties: Vec<(usize, Prop)> = props.collect_properties( |name, dtype| self.graph.resolve_node_property(name, dtype, false), |prop| self.graph.process_prop_value(prop),