diff --git a/.github/workflows/_release_python.yml b/.github/workflows/_release_python.yml index 88967a2b18..0160cad145 100644 --- a/.github/workflows/_release_python.yml +++ b/.github/workflows/_release_python.yml @@ -24,6 +24,10 @@ jobs: permission: "write" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Set up cargo cache uses: actions/cache@v3 continue-on-error: false @@ -121,6 +125,10 @@ jobs: permission: "write" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: actions/checkout@v3 with: ref: ${{ inputs.base }} @@ -157,6 +165,10 @@ jobs: permission: "write" env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Set up cargo cache uses: actions/cache@v3 continue-on-error: false diff --git a/.github/workflows/_release_rust.yml b/.github/workflows/_release_rust.yml index cf08252a83..96b780d91d 100644 --- a/.github/workflows/_release_rust.yml +++ b/.github/workflows/_release_rust.yml @@ -38,6 +38,10 @@ jobs: ~/.cargo/git/db/ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} restore-keys: ${{ runner.os }}-cargo- + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: actions/checkout@v3 - run: | git config --global user.email "Haaroon@users.noreply.github.com" diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index ff57cafa33..5bb8f9c225 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -25,6 +25,10 @@ jobs: name: Checkout - name: Update Rust run: rustup update + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Cargo cache uses: Swatinem/rust-cache@v2 with: diff --git a/.github/workflows/test_python_workflow.yml b/.github/workflows/test_python_workflow.yml index b846e5753b..a46b120308 100644 --- a/.github/workflows/test_python_workflow.yml +++ b/.github/workflows/test_python_workflow.yml @@ -46,6 +46,10 @@ jobs: toolchain: 1.77.0 override: true components: rustfmt, clippy + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: webfactory/ssh-agent@v0.7.0 name: Load raphtory-disk_graph key with: diff --git a/.github/workflows/test_rust_disk_storage_workflow.yml b/.github/workflows/test_rust_disk_storage_workflow.yml index 942708407c..4fe4c62152 100644 --- a/.github/workflows/test_rust_disk_storage_workflow.yml +++ b/.github/workflows/test_rust_disk_storage_workflow.yml @@ -14,11 +14,12 @@ jobs: runs-on: '${{ matrix.os }}' env: CARGO_NET_GIT_FETCH_WITH_CLI: true + RUST_BACKTRACE: 1 strategy: matrix: include: - os: macos-latest - - os: ubuntu-latest + - os: ubuntu-20.04 - os: windows-latest steps: - uses: maxim-lobanov/setup-xcode@v1 @@ -36,7 +37,7 @@ jobs: override: true components: rustfmt, clippy - name: Free up space (ubuntu) - if: matrix.os == 'ubuntu-latest' + if: "contains(matrix.os, 'ubuntu')" run: | sudo rm -rf /usr/share/dotnet sudo rm -rf /usr/local/lib/android @@ -52,10 +53,12 @@ jobs: name: Cargo cache with: cache-all-crates: true - - name: Install bininstall - uses: cargo-bins/cargo-binstall@main + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Install nextest - run: cargo binstall -y --force cargo-nextest + uses: taiki-e/install-action@nextest - name: Activate pometry-storage in Cargo.toml run: make pull-storage - name: Run all Tests (disk_graph) diff --git a/.github/workflows/test_rust_workflow.yml b/.github/workflows/test_rust_workflow.yml index a780faa542..4e70b2f749 100644 --- a/.github/workflows/test_rust_workflow.yml +++ b/.github/workflows/test_rust_workflow.yml @@ -14,11 +14,12 @@ jobs: runs-on: '${{ matrix.os }}' env: CARGO_NET_GIT_FETCH_WITH_CLI: true + RUST_BACKTRACE: 1 strategy: matrix: include: - os: macos-latest - - os: ubuntu-latest + - os: ubuntu-20.04 - os: windows-latest steps: - uses: maxim-lobanov/setup-xcode@v1 @@ -36,12 +37,16 @@ jobs: override: true components: rustfmt, clippy - name: Free up space (ubuntu) - if: matrix.os == 'ubuntu-latest' + if: "contains(matrix.os, 'ubuntu')" run: | sudo rm -rf /usr/share/dotnet sudo rm -rf /usr/local/lib/android sudo rm -rf /opt/ghc sudo rm -rf "$AGENT_TOOLSDIRECTORY" + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: webfactory/ssh-agent@v0.7.0 name: Load pometry-storage key with: @@ -52,10 +57,8 @@ jobs: name: Cargo cache with: cache-all-crates: true - - name: Install bininstall - uses: cargo-bins/cargo-binstall@main - name: Install nextest - run: cargo binstall -y --force cargo-nextest + uses: taiki-e/install-action@nextest - name: Run all Tests (no disk_graph) env: RUSTFLAGS: -Awarnings @@ -80,6 +83,10 @@ jobs: toolchain: 1.77.0 override: true components: rustfmt, clippy + - name: Install Protoc + uses: arduino/setup-protoc@v3 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: Swatinem/rust-cache@v2 name: Cargo cache with: diff --git a/Cargo.lock b/Cargo.lock index 52b6e993f3..c2d869697f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,6 +183,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + [[package]] name = "arbitrary" version = "1.3.2" @@ -3148,6 +3154,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "multiversion" version = "0.7.4" @@ -4042,6 +4054,16 @@ dependencies = [ "yansi 0.5.1", ] +[[package]] +name = "prettyplease" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +dependencies = [ + "proc-macro2", + "syn 2.0.66", +] + [[package]] name = "proc-macro-crate" version = "1.3.1" @@ -4103,6 +4125,59 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.66", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "psm" version = "0.1.21" @@ -4369,6 +4444,7 @@ dependencies = [ "async-trait", "bincode", "bytemuck", + "bytes", "bzip2", "chrono", "csv", @@ -4398,6 +4474,9 @@ dependencies = [ "pometry-storage", "pretty_assertions", "proptest", + "prost", + "prost-build", + "prost-types", "pyo3", "quad-rand", "quickcheck 1.0.3", diff --git a/Cargo.toml b/Cargo.toml index 3a44fb3bab..0b1bbcfef1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,11 @@ url = "2.2" base64-compat = { package = "base64-compat", version = "1.0.0" } time = "0.3.36" +prost = "0.12" +prost-types = "0.12" +bytes = "1.6.0" +prost-build = "0.12" + lazy_static = "1.4.0" pest = "2.7.8" pest_derive = "2.7.8" diff --git a/python/Cargo.toml b/python/Cargo.toml index 459ffb9897..f7fdd0b837 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -19,7 +19,7 @@ crate-type = ["cdylib"] [dependencies] pyo3 = { workspace = true } -raphtory_core = { path = "../raphtory", version = "0.9.3", features = ["python", "search", "vectors"], package = "raphtory" } +raphtory_core = { path = "../raphtory", version = "0.9.3", features = ["python", "search", "vectors", "proto"], package = "raphtory" } raphtory-graphql = { path = "../raphtory-graphql", version = "0.9.3",features = ["python"] } serde_json = { workspace = true } reqwest = { workspace = true } diff --git a/python/tests/test_graphdb.py b/python/tests/test_graphdb.py index 402fdd5099..3b0b5ac18f 100644 --- a/python/tests/test_graphdb.py +++ b/python/tests/test_graphdb.py @@ -19,6 +19,7 @@ import os import shutil import numpy as np +import pickle base_dir = Path(__file__).parent edges = [(1, 1, 2), (2, 1, 3), (-1, 2, 1), (0, 1, 1), (7, 3, 2), (1, 1, 1)] @@ -59,6 +60,29 @@ def test_graph_len_edge_len(): assert g.count_edges() == 5 +def test_graph_pickle(): + g = create_graph() + # pickle graph + with tempfile.TemporaryDirectory() as tmpdirname: + pickle.dump(g, open(tmpdirname + "/graph.p", "wb")) + # unpickle graph + g2 = pickle.load(open(tmpdirname + "/graph.p", "rb")) + + assert g2.count_nodes() == 3 + assert g2.count_edges() == 5 + + +def test_persistent_graph_pickle(): + g = create_graph_with_deletions() + # pickle graph + with tempfile.TemporaryDirectory() as tmpdirname: + pickle.dump(g, open(tmpdirname + "/graph.p", "wb")) + # unpickle graph + g2 = pickle.load(open(tmpdirname + "/graph.p", "rb")) + + assert g2.count_nodes() == 3 + assert g2.count_edges() == 5 + def test_id_iterable(): g = create_graph() diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index 1304010458..d01c64f288 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -53,7 +53,10 @@ mod graphql_test { }; use serde_json::json; use std::collections::{HashMap, HashSet}; - use tempfile::{tempdir, TempDir}; + use tempfile::tempdir; + + #[cfg(feature = "storage")] + use tempfile::TempDir; #[tokio::test] async fn search_for_gandalf_query() { @@ -266,7 +269,6 @@ mod graphql_test { let req = Request::new(prop_has_key_filter); let res = schema.execute(req).await; - let data = res.data.into_json().unwrap(); let expected = json!({ "graph": { "properties": { diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 93dcf2c83d..6b7994491d 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -67,8 +67,6 @@ polars-arrow = { workspace = true, optional = true } polars-parquet = { workspace = true, optional = true } polars-utils = { workspace = true, optional = true } kdam = { workspace = true, optional = true } - -# disk storage optional dependencies memmap2 = { workspace = true, optional = true } ahash = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } @@ -77,6 +75,10 @@ rpds = { workspace = true, optional = true } thread_local = { workspace = true, optional = true } pometry-storage = { workspace = true, optional = true } +prost = { workspace = true, optional = true} +prost-types = { workspace = true, optional = true} +bytes = { workspace = true, optional = true} + [dev-dependencies] csv = { workspace = true } pretty_assertions = { workspace = true } @@ -89,6 +91,9 @@ dotenv = { workspace = true } # for vector testing streaming-stats = { workspace = true } proptest = { workspace = true } +[build-dependencies] +prost-build = { workspace = true, optional = true} + [features] default = [] # Enables the graph loader io module @@ -113,6 +118,7 @@ python = [ "arrow", "search", "vectors", + "proto", "dep:pyo3", "dep:num", "dep:display-error-chain", @@ -141,3 +147,11 @@ arrow = [ "dep:polars-parquet", "polars-parquet?/compression" ] + +proto = [ + "dep:prost", + "dep:prost-types", + "dep:bytes", + "dep:prost-build", + "dep:memmap2", +] \ No newline at end of file diff --git a/raphtory/build.rs b/raphtory/build.rs new file mode 100644 index 0000000000..1ff0547e1d --- /dev/null +++ b/raphtory/build.rs @@ -0,0 +1,11 @@ +use std::io::Result; +#[cfg(feature = "proto")] +fn main() -> Result<()> { + prost_build::compile_protos(&["src/graph.proto"], &["src/"])?; + Ok(()) +} + +#[cfg(not(feature = "proto"))] +fn main() -> Result<()> { + Ok(()) +} diff --git a/raphtory/src/core/utils/errors.rs b/raphtory/src/core/utils/errors.rs index ac789824aa..95ac04c7ee 100644 --- a/raphtory/src/core/utils/errors.rs +++ b/raphtory/src/core/utils/errors.rs @@ -132,6 +132,14 @@ pub enum GraphError { #[error("Illegal set error {0}")] IllegalSet(String), + + #[cfg(feature = "proto")] + #[error("Protobuf encode error{0}")] + DecodeError(#[from] prost::DecodeError), + + #[cfg(feature = "proto")] + #[error("Protobuf decode error{0}")] + EncodeError(#[from] prost::EncodeError), } impl GraphError { diff --git a/raphtory/src/db/api/view/mod.rs b/raphtory/src/db/api/view/mod.rs index 37c908afa9..c0587dda6c 100644 --- a/raphtory/src/db/api/view/mod.rs +++ b/raphtory/src/db/api/view/mod.rs @@ -6,6 +6,8 @@ pub(crate) mod internal; mod layer; pub(crate) mod node; mod reset_filter; +#[cfg(feature = "proto")] +pub mod serialise; pub(crate) mod time; pub(crate) use edge::BaseEdgeViewOps; diff --git a/raphtory/src/db/api/view/serialise.rs b/raphtory/src/db/api/view/serialise.rs new file mode 100644 index 0000000000..4e7b19b035 --- /dev/null +++ b/raphtory/src/db/api/view/serialise.rs @@ -0,0 +1,1070 @@ +use std::{fs::File, io::Write, path::Path, sync::Arc}; + +use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike}; +use prost::{decode_length_delimiter, encode_length_delimiter, Message}; +use raphtory_api::core::{ + entities::VID, + storage::{arc_str::ArcStr, timeindex::TimeIndexEntry}, +}; + +use crate::{ + core::{ + entities::{properties::props::PropMapper, LayerIds}, + utils::errors::GraphError, + DocumentInput, Lifespan, PropType, + }, + db::{ + api::{ + mutation::internal::{ + InternalAdditionOps, InternalDeletionOps, InternalPropertyAdditionOps, + }, + storage::nodes::node_storage_ops::NodeStorageOps, + }, + graph::views::deletion_graph::PersistentGraph, + }, + prelude::*, + serialise::{ + self, lifespan, prop, + properties_meta::{self, PropName}, + AddEdge, AddNode, DelEdge, Dict, GraphConstProps, NdTime, PropPair, UpdateEdgeConstProps, + }, +}; + +use super::GraphViewOps; + +pub trait StableEncoder { + fn encode_to_vec(&self) -> Result, GraphError>; + + fn stable_serialise(&self, path: impl AsRef) -> Result<(), GraphError> { + let mut file = File::create(path)?; + let bytes = self.encode_to_vec()?; + file.write_all(&bytes)?; + + Ok(()) + } +} + +pub trait StableDecode: Default { + fn decode_from_bytes(bytes: &[u8]) -> Result; + fn decode(path: impl AsRef) -> Result { + let file = File::open(path)?; + let buf = unsafe { memmap2::MmapOptions::new().map(&file)? }; + let bytes = buf.as_ref(); + Self::decode_from_bytes(bytes) + } +} + +fn as_proto_prop_type(p_type: &PropType) -> properties_meta::PropType { + match p_type { + PropType::Str => properties_meta::PropType::Str, + PropType::U8 => properties_meta::PropType::U8, + PropType::U16 => properties_meta::PropType::U16, + PropType::U32 => properties_meta::PropType::U32, + PropType::I32 => properties_meta::PropType::I32, + PropType::I64 => properties_meta::PropType::I64, + PropType::U64 => properties_meta::PropType::U64, + PropType::F32 => properties_meta::PropType::F32, + PropType::F64 => properties_meta::PropType::F64, + PropType::Bool => properties_meta::PropType::Bool, + PropType::List => properties_meta::PropType::List, + PropType::Map => properties_meta::PropType::Map, + PropType::NDTime => properties_meta::PropType::NdTime, + PropType::DTime => properties_meta::PropType::DTime, + PropType::Graph => properties_meta::PropType::Graph, + PropType::PersistentGraph => properties_meta::PropType::PersistentGraph, + PropType::Document => properties_meta::PropType::Document, + _ => unimplemented!("Empty prop types not supported!"), + } +} + +fn as_prop_type(p_type: properties_meta::PropType) -> PropType { + match p_type { + properties_meta::PropType::Str => PropType::Str, + properties_meta::PropType::U8 => PropType::U8, + properties_meta::PropType::U16 => PropType::U16, + properties_meta::PropType::U32 => PropType::U32, + properties_meta::PropType::I32 => PropType::I32, + properties_meta::PropType::I64 => PropType::I64, + properties_meta::PropType::U64 => PropType::U64, + properties_meta::PropType::F32 => PropType::F32, + properties_meta::PropType::F64 => PropType::F64, + properties_meta::PropType::Bool => PropType::Bool, + properties_meta::PropType::List => PropType::List, + properties_meta::PropType::Map => PropType::Map, + properties_meta::PropType::NdTime => PropType::NDTime, + properties_meta::PropType::DTime => PropType::DTime, + properties_meta::PropType::Graph => PropType::Graph, + properties_meta::PropType::PersistentGraph => PropType::PersistentGraph, + properties_meta::PropType::Document => PropType::Document, + } +} + +fn collect_prop_names<'a>( + names: impl Iterator, + prop_mapper: &'a PropMapper, +) -> Vec { + names + .enumerate() + .map(|(prop_id, name)| { + let prop_type = prop_mapper + .get_dtype(prop_id) + .expect("Failed to get prop type"); + PropName { + name: name.to_string(), + p_type: as_proto_prop_type(&prop_type).into(), + } + }) + .collect() +} + +impl<'graph, G: GraphViewOps<'graph>> StableEncoder for G { + fn encode_to_vec(&self) -> Result, GraphError> { + let mut graph = serialise::GraphMeta::default(); + + // const graph properties + let (names, properties): (Vec<_>, Vec<_>) = self + .const_prop_ids() + .filter_map(|id| { + let prop = self.get_const_prop(id)?; + let prop_name = self.get_const_prop_name(id); + Some(( + prop_name.to_string(), + PropPair { + key: id as u64, + value: Some(as_proto_prop(&prop).expect("Failed to convert prop")), + }, + )) + }) + .unzip(); + + graph.const_properties = Some(GraphConstProps { names, properties }); + + // temporal graph properties + let prop_names = self + .temporal_prop_keys() + .into_iter() + .map(|n| n.to_string()) + .collect::>(); + + let (ts, props): (Vec<_>, Vec<_>) = self + .temporal_prop_ids() + .flat_map(|id| { + let prop_t = self.temporal_history(id); + let props = self.temporal_values(id); + props.into_iter().zip(prop_t).map(move |(prop, t)| { + let prop = as_proto_prop(&prop).expect("Failed to convert prop"); + ( + t, + PropPair { + key: id as u64, + value: Some(prop), + }, + ) + }) + }) + .unzip(); + + graph.temp_properties = Some(serialise::GraphTempProps { + names: prop_names, + times: ts, + properties: props, + }); + + graph.layers = self + .unique_layers() + .map(|l_name| l_name.to_string()) + .collect(); + graph.node_types = self + .get_all_node_types() + .into_iter() + .map(|s| s.to_string()) + .collect(); + + let n_const_meta = &self.node_meta().const_prop_meta(); + let n_temporal_meta = &self.node_meta().temporal_prop_meta(); + let e_const_meta = &self.edge_meta().const_prop_meta(); + let e_temporal_meta = &self.edge_meta().temporal_prop_meta(); + + graph.meta = Some(serialise::PropertiesMeta { + nodes: Some(properties_meta::PropNames { + constant: collect_prop_names(n_const_meta.get_keys().iter(), n_const_meta), + temporal: collect_prop_names(n_temporal_meta.get_keys().iter(), n_temporal_meta), + }), + edges: Some(properties_meta::PropNames { + constant: collect_prop_names(e_const_meta.get_keys().iter(), e_const_meta), + temporal: collect_prop_names(e_temporal_meta.get_keys().iter(), e_temporal_meta), + }), + }); + + graph.nodes = self + .nodes() + .into_iter() + .map(|n: crate::db::graph::node::NodeView| { + let gid = n.id(); + let vid = n.node; + let node = self.core_node_entry(vid); + let name = node.as_ref().name().map(|n| n.to_string()); + serialise::Node { + gid, + vid: vid.0 as u64, + name, + } + }) + .collect::>(); + + let mut bytes = vec![]; + + graph.encode_length_delimited(&mut bytes)?; + + let mut add_nodes = vec![]; + let mut const_nodes_props = vec![]; + + for v in self.nodes().iter() { + let type_id = Some(v.node_type_id() as u64); + let id = v.node.0 as u64; + + for time in v.history() { + add_nodes.push(AddNode { + id, + properties: None, + type_id, + time, + }); + } + + for (prop_name, prop_view) in v.properties().temporal().iter() { + for (time, prop) in prop_view.iter() { + let key = self + .node_meta() + .temporal_prop_meta() + .get_id(&prop_name) + .unwrap(); + add_nodes.push(AddNode { + id, + properties: Some(as_prop_pair(key as u64, &prop)?), + type_id, + time, + }); + } + } + + for (prop_name, prop) in v.properties().constant() { + let key = self + .node_meta() + .const_prop_meta() + .get_id(&prop_name) + .unwrap(); + const_nodes_props.push(serialise::UpdateNodeConstProps { + id, + properties: Some(as_prop_pair(key as u64, &prop)?), + }); + } + } + + encode_length_delimiter(add_nodes.len(), &mut bytes)?; + for add_node in add_nodes { + add_node.encode_length_delimited(&mut bytes)?; + } + + encode_length_delimiter(const_nodes_props.len(), &mut bytes)?; + for const_node_props in const_nodes_props { + const_node_props.encode_length_delimited(&mut bytes)?; + } + + let mut const_edges_props = vec![]; + let mut edges = vec![]; + let mut del_edges = vec![]; + + for e in self.edges() { + let src = e.src().node.0 as u64; + let dst = e.dst().node.0 as u64; + // FIXME: this needs to be verified + for ee in e.explode_layers() { + let layer_id = *ee.edge.layer().expect("exploded layers"); + + for (prop_name, prop) in ee.properties().constant() { + let key = self + .edge_meta() + .const_prop_meta() + .get_id(&prop_name) + .unwrap(); + const_edges_props.push(serialise::UpdateEdgeConstProps { + src, + dst, + layer_id: layer_id as u64, + properties: Some(as_prop_pair(key as u64, &prop)?), + }); + } + + for ee in ee.explode() { + edges.push(AddEdge { + src, + dst, + properties: None, + time: ee.time().expect("exploded edge"), + layer_id: Some(layer_id as u64), + }); + + for (prop_name, prop_view) in ee.properties().temporal() { + for (time, prop) in prop_view.iter() { + let key = self + .edge_meta() + .temporal_prop_meta() + .get_id(&prop_name) + .unwrap(); + edges.push(AddEdge { + src, + dst, + properties: Some(as_prop_pair(key as u64, &prop)?), + time, + layer_id: Some(layer_id as u64), + }); + } + } + + for time in ee.deletions() { + del_edges.push(DelEdge { + src, + dst, + time, + layer_id: Some(layer_id as u64), + }); + } + } + } + } + + encode_length_delimiter(edges.len(), &mut bytes)?; + for edge in edges { + edge.encode_length_delimited(&mut bytes)?; + } + + encode_length_delimiter(del_edges.len(), &mut bytes)?; + for del_edge in del_edges { + del_edge.encode_length_delimited(&mut bytes)?; + } + + encode_length_delimiter(const_edges_props.len(), &mut bytes)?; + for const_edge_props in const_edges_props { + const_edge_props.encode_length_delimited(&mut bytes)?; + } + + Ok(bytes) + } +} + +impl< + 'graph, + G: InternalAdditionOps + + GraphViewOps<'graph> + + InternalPropertyAdditionOps + + InternalDeletionOps + + Default, + > StableDecode for G +{ + fn decode_from_bytes(mut buf: &[u8]) -> Result { + let graph = G::default(); + let g = serialise::GraphMeta::decode_length_delimited(&mut buf) + .expect("Failed to decode graph"); + + // constant graph properties + if let Some(meta) = g.const_properties.as_ref() { + for (name, prop_pair) in meta.names.iter().zip(meta.properties.iter()) { + let id = graph.graph_meta().resolve_property(name, true); + assert_eq!(id, prop_pair.key as usize); + + let prop = prop_pair.value.as_ref().and_then(|p| p.value.as_ref()); + let prop = graph.process_prop_value(as_prop_value(prop)); + graph.graph_meta().add_constant_prop(id, prop)?; + } + } + + if let Some(meta) = g.temp_properties.as_ref() { + for name in meta.names.iter() { + graph.graph_meta().resolve_property(name, false); + } + + for (time, prop_pair) in meta.times.iter().zip(meta.properties.iter()) { + let id = prop_pair.key as usize; + let prop = prop_pair.value.as_ref().and_then(|p| p.value.as_ref()); + let prop = graph.process_prop_value(as_prop_value(prop)); + graph + .graph_meta() + .add_prop(TimeIndexEntry::from(*time), id, prop)?; + } + } + + // align the nodes + for node in g.nodes { + let l_vid = graph.resolve_node(node.gid, node.name.as_deref()); + assert_eq!(l_vid, VID(node.vid as usize)); + } + + // align the node types + for (type_id, type_name) in g.node_types.iter().enumerate() { + let n_id = graph.node_meta().get_or_create_node_type_id(type_name); + assert_eq!(n_id, type_id); + } + + // alight the edge layers + for (layer_id, layer) in g.layers.iter().enumerate() { + let l_id = graph.resolve_layer(Some(layer)); + assert_eq!(l_id, layer_id); + } + + // align the node properties + if let Some(meta) = g.meta.as_ref().and_then(|m| m.nodes.as_ref()) { + for PropName { name, p_type } in &meta.constant { + let p_type = properties_meta::PropType::try_from(*p_type).unwrap(); + graph + .node_meta() + .resolve_prop_id(&name, as_prop_type(p_type), true)?; + } + + for PropName { name, p_type } in &meta.temporal { + let p_type = properties_meta::PropType::try_from(*p_type).unwrap(); + graph + .node_meta() + .resolve_prop_id(&name, as_prop_type(p_type), false)?; + } + } + + // align the edge properties + + if let Some(meta) = g.meta.as_ref().and_then(|m| m.edges.as_ref()) { + for PropName { name, p_type } in &meta.constant { + let p_type = properties_meta::PropType::try_from(*p_type).unwrap(); + graph + .edge_meta() + .resolve_prop_id(&name, as_prop_type(p_type), true)?; + } + + for PropName { name, p_type } in &meta.temporal { + let p_type = properties_meta::PropType::try_from(*p_type).unwrap(); + graph + .edge_meta() + .resolve_prop_id(&name, as_prop_type(p_type), false)?; + } + } + + let nodes_len = decode_length_delimiter(&mut buf)?; + + for node_res in (0..nodes_len).map(|_| AddNode::decode_length_delimited(&mut buf)) { + let AddNode { + id, + properties, + time, + type_id, + } = node_res?; + let v = VID(id as usize); + let props = properties + .as_ref() + .map(|p| as_prop(p)) + .map(|(id, prop)| (id, graph.process_prop_value(prop))) + .into_iter() + .collect(); + graph.internal_add_node( + TimeIndexEntry::from(time), + v, + props, + type_id.map(|id| id as usize).unwrap(), + )?; + } + + let const_nodes_len = decode_length_delimiter(&mut buf)?; + + for update_node_const_props in (0..const_nodes_len) + .map(|_| serialise::UpdateNodeConstProps::decode_length_delimited(&mut buf)) + { + let update_node_const_props = update_node_const_props?; + let vid = VID(update_node_const_props.id as usize); + let props = update_node_const_props + .properties + .iter() + .map(|prop| as_prop(prop)) + .map(|(id, prop)| (id, graph.process_prop_value(prop))) + .collect(); + graph.internal_update_constant_node_properties(vid, props)?; + } + + let edges_len = decode_length_delimiter(&mut buf)?; + + for add_edge in (0..edges_len).map(|_| AddEdge::decode_length_delimited(&mut buf)) { + let AddEdge { + src, + dst, + properties, + time, + layer_id, + } = add_edge?; + let src = VID(src as usize); + let dst = VID(dst as usize); + let props = properties + .as_ref() + .map(|p| as_prop(p)) + .map(|(id, prop)| (id, graph.process_prop_value(prop))) + .into_iter() + .collect(); + graph.internal_add_edge( + TimeIndexEntry::from(time), + src, + dst, + props, + layer_id.map(|id| id as usize).unwrap(), + )?; + } + + let del_edges_len = decode_length_delimiter(&mut buf)?; + + for del_edge in (0..del_edges_len).map(|_| DelEdge::decode_length_delimited(&mut buf)) { + let DelEdge { + src, + dst, + time, + layer_id, + } = del_edge?; + let src = VID(src as usize); + let dst = VID(dst as usize); + graph.internal_delete_edge( + TimeIndexEntry::from(time), + src, + dst, + layer_id.map(|id| id as usize).unwrap(), + )?; + } + + let const_edges_len = decode_length_delimiter(&mut buf)?; + + for update_edge in (0..const_edges_len) + .map(|_| serialise::UpdateEdgeConstProps::decode_length_delimited(&mut buf)) + { + let UpdateEdgeConstProps { + src, + dst, + properties, + layer_id, + } = update_edge?; + let src = VID(src as usize); + let dst = VID(dst as usize); + let eid = graph + .core_node_entry(src) + .find_edge(dst, &LayerIds::All) + .map(|e| e.pid()) + .unwrap(); + let props = properties + .iter() + .map(|prop| as_prop(prop)) + .map(|(id, prop)| (id, graph.process_prop_value(prop))) + .collect(); + graph.internal_update_constant_edge_properties(eid, layer_id as usize, props)?; + } + + Ok(graph) + } +} + +fn as_prop(prop_pair: &PropPair) -> (usize, Prop) { + let PropPair { key, value } = prop_pair; + let value = value.as_ref().expect("Missing prop value"); + let value = value.value.as_ref(); + let value = as_prop_value(value); + + (*key as usize, value) +} + +fn as_prop_value(value: Option<&prop::Value>) -> Prop { + let value = match value.expect("Missing prop value") { + prop::Value::BoolValue(b) => Prop::Bool(*b), + prop::Value::U8(u) => Prop::U8((*u).try_into().unwrap()), + prop::Value::U16(u) => Prop::U16((*u).try_into().unwrap()), + prop::Value::U32(u) => Prop::U32(*u), + prop::Value::I32(i) => Prop::I32(*i), + prop::Value::I64(i) => Prop::I64(*i), + prop::Value::U64(u) => Prop::U64(*u), + prop::Value::F32(f) => Prop::F32(*f), + prop::Value::F64(f) => Prop::F64(*f), + prop::Value::Str(s) => Prop::Str(ArcStr::from(s.as_str())), + prop::Value::Prop(props) => Prop::List(Arc::new( + props + .properties + .iter() + .map(|prop| as_prop_value(prop.value.as_ref())) + .collect(), + )), + prop::Value::Map(dict) => Prop::Map(Arc::new( + dict.map + .iter() + .map(|(k, v)| (ArcStr::from(k.as_str()), as_prop_value(v.value.as_ref()))) + .collect(), + )), + serialise::prop::Value::NdTime(ndt) => { + let NdTime { + year, + month, + day, + hour, + minute, + second, + nanos, + } = ndt; + let ndt = NaiveDateTime::new( + NaiveDate::from_ymd_opt(*year as i32, *month as u32, *day as u32).unwrap(), + NaiveTime::from_hms_nano_opt( + *hour as u32, + *minute as u32, + *second as u32, + *nanos as u32, + ) + .unwrap(), + ); + Prop::NDTime(ndt) + } + serialise::prop::Value::DTime(dt) => { + Prop::DTime(DateTime::parse_from_rfc3339(dt).unwrap().into()) + } + serialise::prop::Value::Graph(graph_bytes) => { + let g = Graph::decode_from_bytes(&graph_bytes).expect("Failed to decode graph"); + Prop::Graph(g) + } + serialise::prop::Value::PersistentGraph(graph_bytes) => { + let g = + PersistentGraph::decode_from_bytes(&graph_bytes).expect("Failed to decode graph"); + Prop::PersistentGraph(g) + } + serialise::prop::Value::DocumentInput(doc) => Prop::Document(DocumentInput { + content: doc.content.clone(), + life: doc + .life + .as_ref() + .map(|l| match l.l_type { + Some(lifespan::LType::Interval(lifespan::Interval { start, end })) => { + Lifespan::Interval { start, end } + } + Some(lifespan::LType::Event(lifespan::Event { time })) => { + Lifespan::Event { time } + } + None => Lifespan::Inherited, + }) + .unwrap_or(Lifespan::Inherited), + }), + }; + value +} + +fn as_prop_pair(key: u64, prop: &Prop) -> Result { + Ok(PropPair { + key, + value: Some(as_proto_prop(prop)?), + }) +} + +fn as_proto_prop(prop: &Prop) -> Result { + let value: prop::Value = match prop { + Prop::Bool(b) => prop::Value::BoolValue(*b), + Prop::U8(u) => prop::Value::U8((*u).into()), + Prop::U16(u) => prop::Value::U16((*u).into()), + Prop::U32(u) => prop::Value::U32(*u), + Prop::I32(i) => prop::Value::I32(*i), + Prop::I64(i) => prop::Value::I64(*i), + Prop::U64(u) => prop::Value::U64(*u), + Prop::F32(f) => prop::Value::F32(*f), + Prop::F64(f) => prop::Value::F64(*f), + Prop::Str(s) => prop::Value::Str(s.to_string()), + Prop::List(list) => { + let properties = list.iter().map(as_proto_prop).collect::>()?; + prop::Value::Prop(serialise::Props { properties }) + } + Prop::Map(map) => { + let map = map + .iter() + .map(|(k, v)| as_proto_prop(v).map(|v| (k.to_string(), v))) + .collect::>()?; + prop::Value::Map(Dict { map }) + } + Prop::NDTime(ndt) => { + let (year, month, day) = (ndt.date().year(), ndt.date().month(), ndt.date().day()); + let (hour, minute, second, nanos) = ( + ndt.time().hour(), + ndt.time().minute(), + ndt.time().second(), + ndt.time().nanosecond(), + ); + + let proto_ndt = NdTime { + year: year as u32, + month: month as u32, + day: day as u32, + hour: hour as u32, + minute: minute as u32, + second: second as u32, + nanos: nanos as u32, + }; + prop::Value::NdTime(proto_ndt) + } + Prop::DTime(dt) => { + prop::Value::DTime(dt.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)) + } + Prop::Graph(g) => { + let bytes = g.encode_to_vec()?; + prop::Value::Graph(bytes) + } + Prop::PersistentGraph(g) => { + let bytes = g.encode_to_vec()?; + prop::Value::PersistentGraph(bytes) + } + Prop::Document(doc) => { + let life = match doc.life { + Lifespan::Interval { start, end } => { + Some(lifespan::LType::Interval(lifespan::Interval { start, end })) + } + Lifespan::Event { time } => Some(lifespan::LType::Event(lifespan::Event { time })), + Lifespan::Inherited => None, + }; + prop::Value::DocumentInput(serialise::DocumentInput { + content: doc.content.clone(), + life: Some(serialise::Lifespan { l_type: life }), + }) + } + }; + + Ok(serialise::Prop { value: Some(value) }) +} + +#[cfg(test)] +mod proto_test { + use chrono::{DateTime, NaiveDateTime}; + + use crate::{ + core::DocumentInput, + db::{ + api::{mutation::DeletionOps, properties::internal::ConstPropertiesOps}, + graph::graph::assert_graph_equal, + }, + prelude::*, + }; + + use super::*; + + #[test] + fn node_no_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn node_with_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + g1.add_node(2, "Bob", [("age", Prop::U32(47))], None) + .unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn node_with_const_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + let n1 = g1 + .add_node(2, "Bob", [("age", Prop::U32(47))], None) + .unwrap(); + + n1.update_constant_properties([("name", Prop::Str("Bob".into()))]) + .expect("Failed to update constant properties"); + + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn edge_no_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + g1.add_node(2, "Bob", NO_PROPS, None).unwrap(); + g1.add_edge(3, "Alice", "Bob", NO_PROPS, None).unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn edge_no_props_delete() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new().persistent_graph(); + g1.add_edge(3, "Alice", "Bob", NO_PROPS, None).unwrap(); + g1.delete_edge(19, "Alice", "Bob", None).unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = PersistentGraph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + let edge = g2.edge("Alice", "Bob").expect("Failed to get edge"); + let deletions = edge.deletions().iter().copied().collect::>(); + assert_eq!(deletions, vec![19]); + } + + #[test] + fn edge_t_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + g1.add_node(2, "Bob", NO_PROPS, None).unwrap(); + g1.add_edge(3, "Alice", "Bob", [("kind", "friends")], None) + .unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn edge_const_props() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + let e1 = g1.add_edge(3, "Alice", "Bob", NO_PROPS, None).unwrap(); + e1.update_constant_properties([("friends", true)], None) + .expect("Failed to update constant properties"); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn edge_layers() { + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_edge(7, "Alice", "Bob", NO_PROPS, Some("one")) + .unwrap(); + g1.add_edge(7, "Bob", "Charlie", [("friends", false)], Some("two")) + .unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + } + + #[test] + fn test_all_the_t_props_on_node() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_node(1, "Alice", props.clone(), None).unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + let node = g2.node("Alice").expect("Failed to get node"); + + assert!(props.into_iter().all(|(name, expected)| { + node.properties() + .temporal() + .get(name) + .filter(|prop_view| { + let (t, prop) = prop_view.iter().next().expect("Failed to get prop"); + prop == expected && t == 1 + }) + .is_some() + })) + } + + #[test] + fn test_all_the_t_props_on_edge() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + g1.add_edge(1, "Alice", "Bob", props.clone(), None).unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + let edge = g2.edge("Alice", "Bob").expect("Failed to get edge"); + + assert!(props.into_iter().all(|(name, expected)| { + edge.properties() + .temporal() + .get(name) + .filter(|prop_view| { + let (t, prop) = prop_view.iter().next().expect("Failed to get prop"); + prop == expected && t == 1 + }) + .is_some() + })) + } + + #[test] + fn test_all_the_const_props_on_edge() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + let e = g1.add_edge(1, "Alice", "Bob", NO_PROPS, Some("a")).unwrap(); + e.update_constant_properties(props.clone(), Some("a")) + .expect("Failed to update constant properties"); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + let edge = g2 + .edge("Alice", "Bob") + .expect("Failed to get edge") + .layers("a") + .unwrap(); + + assert!(props.into_iter().all(|(name, expected)| { + edge.properties() + .constant() + .get(name) + .filter(|prop| prop == &expected) + .is_some() + })) + } + + #[test] + fn test_all_the_const_props_on_node() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let g1 = Graph::new(); + let n = g1.add_node(1, "Alice", NO_PROPS, None).unwrap(); + n.update_constant_properties(props.clone()) + .expect("Failed to update constant properties"); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + let node = g2.node("Alice").expect("Failed to get node"); + + assert!(props.into_iter().all(|(name, expected)| { + node.properties() + .constant() + .get(name) + .filter(|prop| prop == &expected) + .is_some() + })) + } + + #[test] + fn graph_const_properties() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let g1 = Graph::new(); + g1.add_constant_properties(props.clone()) + .expect("Failed to add constant properties"); + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + props.into_iter().for_each(|(name, prop)| { + let id = g2.get_const_prop_id(name).expect("Failed to get prop id"); + assert_eq!(prop, g2.get_const_prop(id).expect("Failed to get prop")); + }); + } + + #[test] + fn graph_temp_properties() { + let mut props = vec![]; + write_props_to_vec(&mut props); + + let g1 = Graph::new(); + for t in 0..props.len() { + g1.add_properties(t as i64, (&props[t..t + 1]).to_vec()) + .expect("Failed to add constant properties"); + } + + let temp_file = tempfile::NamedTempFile::new().unwrap(); + g1.stable_serialise(&temp_file).unwrap(); + let g2 = Graph::decode(&temp_file).unwrap(); + assert_graph_equal(&g1, &g2); + + props + .into_iter() + .enumerate() + .for_each(|(expected_t, (name, expected))| { + for (t, prop) in g2 + .properties() + .temporal() + .get(name) + .expect("Failed to get prop view") + { + assert_eq!(prop, expected); + assert_eq!(t, expected_t as i64); + } + }); + } + + fn write_props_to_vec(props: &mut Vec<(&str, Prop)>) { + props.push(("name", Prop::Str("Alice".into()))); + props.push(("age", Prop::U32(47))); + props.push(("score", Prop::I32(27))); + props.push(("is_adult", Prop::Bool(true))); + props.push(("height", Prop::F32(1.75))); + props.push(("weight", Prop::F64(75.5))); + props.push(( + "children", + Prop::List(Arc::new(vec![ + Prop::Str("Bob".into()), + Prop::Str("Charlie".into()), + ])), + )); + props.push(( + "properties", + Prop::Map(Arc::new( + props + .iter() + .map(|(k, v)| (ArcStr::from(*k), v.clone())) + .collect(), + )), + )); + let fmt = "%Y-%m-%d %H:%M:%S"; + props.push(( + "time", + Prop::NDTime( + NaiveDateTime::parse_from_str("+10000-09-09 01:46:39", fmt) + .expect("Failed to parse time"), + ), + )); + + props.push(( + "dtime", + Prop::DTime( + DateTime::parse_from_rfc3339("2021-09-09T01:46:39Z") + .unwrap() + .into(), + ), + )); + + props.push(( + "doc", + Prop::Document(DocumentInput { + content: "Hello, World!".into(), + life: Lifespan::Interval { + start: -11i64, + end: 100i64, + }, + }), + )); + let graph = Graph::new(); + graph.add_edge(1, "a", "b", NO_PROPS, None).unwrap(); + props.push(("graph", Prop::Graph(graph))); + + let graph = Graph::new().persistent_graph(); + graph.add_edge(1, "a", "b", NO_PROPS, None).unwrap(); + graph.delete_edge(2, "a", "b", None).unwrap(); + props.push(("p_graph", Prop::PersistentGraph(graph))); + } +} diff --git a/raphtory/src/db/graph/graph.rs b/raphtory/src/db/graph/graph.rs index 32dd8a89cd..837531c271 100644 --- a/raphtory/src/db/graph/graph.rs +++ b/raphtory/src/db/graph/graph.rs @@ -88,9 +88,22 @@ pub fn assert_graph_equal< g1.count_temporal_edges(), g2.count_temporal_edges() ); - for n_id in g1.nodes().id().values() { - assert!(g2.has_node(n_id), "missing node {n_id}"); + for n1 in g1.nodes() { + assert!(g2.has_node(n1.id()), "missing node {}", n1.id()); + + let c1 = n1.properties().constant().into_iter().count(); + let t1 = n1.properties().temporal().into_iter().count(); + let check = g2 + .node(n1.id()) + .filter(|node| { + c1 == node.properties().constant().into_iter().count() + && t1 == node.properties().temporal().into_iter().count() + }) + .is_some(); + + assert!(check, "node {:?} properties mismatch", n1.id()); } + for e in g1.edges().explode() { // all exploded edges exist in other let e2 = g2 @@ -101,7 +114,20 @@ pub fn assert_graph_equal< "exploded edge {:?} not active as expected at time {}", e2.id(), e.time().unwrap() - ) + ); + + let c1 = e.properties().constant().into_iter().count(); + let t1 = e.properties().temporal().into_iter().count(); + let check = g2 + .edge(e.src().id(), e.dst().id()) + .filter(|ee| { + ee.active(e.time().expect("exploded")) + && c1 == e.properties().constant().into_iter().count() + && t1 == e.properties().temporal().into_iter().count() + }) + .is_some(); + + assert!(check, "edge {:?} properties mismatch", e.id()); } } diff --git a/raphtory/src/graph.proto b/raphtory/src/graph.proto new file mode 100644 index 0000000000..efe33d7dfa --- /dev/null +++ b/raphtory/src/graph.proto @@ -0,0 +1,167 @@ +syntax = "proto3"; + +package serialise; + +message Prop { + oneof value { + string str = 1; + uint32 u8 = 2; // Note: Protobuf does not have a UInt8 type, using uint32 instead. + uint32 u16 = 3; // Note: Protobuf does not have a UInt16 type, using uint32 instead. + int32 i32 = 4; + int64 i64 = 5; + uint32 u32 = 6; + uint64 u64 = 7; + float f32 = 8; + double f64 = 9; + bool bool_value = 10; + Props prop = 11; + Dict map = 12; + bytes graph = 13; // Assuming Data can be represented as bytes. + bytes persistentGraph = 14; // Assuming Data can be represented as bytes. + NDTime ndTime = 15; + string dTime = 16; + DocumentInput documentInput = 17; + } +} + +message NDTime{ + uint32 year = 1; + uint32 month = 2; + uint32 day = 3; + uint32 hour = 4; + uint32 minute = 5; + uint32 second = 6; + uint32 nanos = 7; +} + +message Dict{ + map map = 1; +} + +message Props{ + repeated Prop properties = 1; +} + +message DocumentInput { + string content = 1; + Lifespan life = 2; +} + +message Lifespan { + oneof l_type { + Interval interval = 1; + Event event = 2; + // Inherited is represented by the absence of both interval and event. + } + + message Interval { + int64 start = 1; + int64 end = 2; + } + + message Event { + int64 time = 1; + } +} + +message GraphMeta{ + // Graph metadata + GraphConstProps constProperties = 1; + GraphTempProps tempProperties = 2; + PropertiesMeta meta = 3; + repeated string layers = 4; + repeated string node_types = 5; + repeated Node nodes = 6; +} + +message Node{ + uint64 gid = 1; + uint64 vid = 2; + optional string name = 3; +} + +message PropertiesMeta{ + PropNames nodes = 1; + PropNames edges = 2; + + message PropNames{ + repeated PropName constant = 1; + repeated PropName temporal = 2; + } + + message PropName{ + string name = 1; + PropType p_type = 2; + } + + enum PropType { + Str = 0; + U8 = 1; + U16 = 2; + I32 = 3; + I64 = 4; + U32 = 5; + U64 = 6; + F32 = 7; + F64 = 8; + Bool = 9; + List = 10; + Map = 11; + NDTime = 12; + Graph = 13; + PersistentGraph = 14; + Document = 15; + DTime = 16; + } + +} + +message AddEdge { + uint64 src = 1; + uint64 dst = 2; + PropPair properties = 3; + int64 time = 4; + optional uint64 layer_id = 5; +} + +message DelEdge{ + uint64 src = 1; + uint64 dst = 2; + int64 time = 3; + optional uint64 layer_id = 4; +} + +message AddNode { + uint64 id = 1; + optional PropPair properties = 3; + int64 time = 4; + optional uint64 type_id = 5; +} + +message UpdateNodeConstProps { + uint64 id = 1; + PropPair properties = 2; +} + +message UpdateEdgeConstProps { + uint64 src = 1; + uint64 dst = 2; + uint64 layer_id = 3; + PropPair properties = 4; +} + +message GraphConstProps{ + repeated string names = 1; + repeated PropPair properties = 2; +} + +message GraphTempProps{ + repeated string names = 1; + repeated int64 times = 2; + repeated PropPair properties = 3; +} + +message PropPair { + uint64 key = 1; + Prop value = 2; +} diff --git a/raphtory/src/lib.rs b/raphtory/src/lib.rs index 607392ce80..ad409ba5c5 100644 --- a/raphtory/src/lib.rs +++ b/raphtory/src/lib.rs @@ -129,6 +129,11 @@ pub const BINCODE_VERSION: u32 = 2u32; #[cfg(feature = "storage")] pub use polars_arrow as arrow2; +#[cfg(feature = "proto")] +mod serialise { + include!(concat!(env!("OUT_DIR"), "/serialise.rs")); +} + #[cfg(test)] mod test_utils { #[cfg(feature = "storage")] diff --git a/raphtory/src/python/graph/graph.rs b/raphtory/src/python/graph/graph.rs index 2d078de0f4..9dcd11bdc6 100644 --- a/raphtory/src/python/graph/graph.rs +++ b/raphtory/src/python/graph/graph.rs @@ -7,8 +7,15 @@ use crate::{ algorithms::components::LargestConnectedComponent, core::{entities::nodes::node_ref::NodeRef, utils::errors::GraphError}, db::{ - api::view::internal::{CoreGraphOps, DynamicGraph, IntoDynamic, MaterializedGraph}, - graph::{edge::EdgeView, node::NodeView, views::node_subgraph::NodeSubgraph}, + api::view::{ + internal::{CoreGraphOps, DynamicGraph, IntoDynamic, MaterializedGraph}, + serialise::{StableDecode, StableEncoder}, + }, + graph::{ + edge::EdgeView, + node::NodeView, + views::{deletion_graph::PersistentGraph, node_subgraph::NodeSubgraph}, + }, }, io::parquet_loaders::*, prelude::*, @@ -20,7 +27,10 @@ use crate::{ utils::{PyInputNode, PyTime}, }, }; -use pyo3::{prelude::*, types::PyBytes}; +use pyo3::{ + prelude::*, + types::{PyBytes, PyTuple}, +}; use raphtory_api::core::storage::arc_str::ArcStr; use std::{ collections::HashMap, @@ -30,7 +40,7 @@ use std::{ /// A temporal graph. #[derive(Clone)] -#[pyclass(name = "Graph", extends = PyGraphView)] +#[pyclass(name = "Graph", extends = PyGraphView, module = "raphtory")] pub struct PyGraph { pub graph: Graph, } @@ -111,6 +121,56 @@ impl PyGraph { } } +#[pyclass(module = "raphtory")] +pub enum PyGraphEncoder { + Graph, + PersistentGraph, +} + +#[pymethods] +impl PyGraphEncoder { + #[new] + pub fn new() -> Self { + PyGraphEncoder::Graph + } + + pub fn __call__(&self, bytes: Vec) -> PyResult { + Python::with_gil(|py| match self { + PyGraphEncoder::Graph => { + let g = Graph::decode_from_bytes(&bytes)?; + Ok(g.into_py(py)) + } + PyGraphEncoder::PersistentGraph => { + let g = PersistentGraph::decode_from_bytes(&bytes)?; + Ok(g.into_py(py)) + } + }) + } + + pub fn __setstate__(&mut self, state: &PyBytes) -> PyResult<()> { + match state.as_bytes() { + [0] => *self = PyGraphEncoder::Graph, + [1] => *self = PyGraphEncoder::PersistentGraph, + _ => { + return Err(PyErr::new::( + "Invalid state".to_string(), + )) + } + } + Ok(()) + } + pub fn __getstate__<'py>(&self, py: Python<'py>) -> PyResult<&'py PyBytes> { + match self { + PyGraphEncoder::Graph => Ok(PyBytes::new(py, &[0])), + PyGraphEncoder::PersistentGraph => Ok(PyBytes::new(py, &[1])), + } + } + + pub fn __getnewargs__<'a>(&self, py: Python<'a>) -> PyResult<&'a PyTuple> { + Ok(PyTuple::empty(py)) + } +} + /// A temporal graph. #[pymethods] impl PyGraph { @@ -125,6 +185,11 @@ impl PyGraph { ) } + fn __reduce__(&self) -> PyResult<(PyGraphEncoder, (Vec,))> { + let state = self.graph.encode_to_vec()?; + Ok((PyGraphEncoder::Graph, (state,))) + } + /// Adds a new node with the given id and properties to the graph. /// /// Arguments: diff --git a/raphtory/src/python/graph/graph_with_deletions.rs b/raphtory/src/python/graph/graph_with_deletions.rs index 8c50c0d9ad..433a48e337 100644 --- a/raphtory/src/python/graph/graph_with_deletions.rs +++ b/raphtory/src/python/graph/graph_with_deletions.rs @@ -10,7 +10,10 @@ use crate::{ db::{ api::{ mutation::{AdditionOps, PropertyAdditionOps}, - view::internal::{CoreGraphOps, MaterializedGraph}, + view::{ + internal::{CoreGraphOps, MaterializedGraph}, + serialise::StableEncoder, + }, }, graph::{edge::EdgeView, node::NodeView, views::deletion_graph::PersistentGraph}, }, @@ -28,7 +31,10 @@ use std::{ path::{Path, PathBuf}, }; -use super::{graph::PyGraph, io::pandas_loaders::*}; +use super::{ + graph::{PyGraph, PyGraphEncoder}, + io::pandas_loaders::*, +}; use crate::io::parquet_loaders::*; /// A temporal graph that allows edges and nodes to be deleted. @@ -99,6 +105,11 @@ impl PyPersistentGraph { ) } + fn __reduce__(&self) -> PyResult<(PyGraphEncoder, (Vec,))> { + let state = self.graph.encode_to_vec()?; + Ok((PyGraphEncoder::PersistentGraph, (state,))) + } + /// Adds a new node with the given id and properties to the graph. /// /// Arguments: diff --git a/raphtory/src/python/packages/base_modules.rs b/raphtory/src/python/packages/base_modules.rs index 8741fda574..694f3fecdc 100644 --- a/raphtory/src/python/packages/base_modules.rs +++ b/raphtory/src/python/packages/base_modules.rs @@ -9,7 +9,7 @@ use crate::{ algorithm_result::AlgorithmResult, edge::{PyDirection, PyEdge, PyMutableEdge}, edges::PyEdges, - graph::PyGraph, + graph::{PyGraph, PyGraphEncoder}, graph_with_deletions::PyPersistentGraph, index::GraphIndex, node::{PyMutableNode, PyNode, PyNodes}, @@ -31,6 +31,7 @@ pub fn add_raphtory_classes(m: &PyModule) -> PyResult<()> { add_classes!( m, PyGraph, + PyGraphEncoder, PyPersistentGraph, PyNode, PyNodes,