Skip to content

Commit

Permalink
Added 'hard deletion' semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
miratepuffin committed Oct 23, 2023
1 parent 3365517 commit 40edfc4
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 19 deletions.
4 changes: 1 addition & 3 deletions raphtory/src/algorithms/community_detection/in_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
},
};
use std::{
cmp,
collections::{HashMap, HashSet},
};

Expand All @@ -42,7 +41,6 @@ where
let ctx: Context<G, ComputeStateVec> = graph.into();
let step1 = ATask::new(move |vv: &mut EvalVertexView<'_, G, _, _>| {
let mut in_components = HashSet::new();
let id = vv.id();
let mut to_check_stack = Vec::new();
vv.in_neighbours().id().for_each(|id| {
in_components.insert(id);
Expand Down Expand Up @@ -127,7 +125,7 @@ mod components_test {
correct.insert("6".to_string(), Some(vec![1, 2, 4, 5]));
correct.insert("7".to_string(), Some(vec![1, 2, 4, 5]));
correct.insert("8".to_string(), Some(vec![1, 2, 5]));
let mut map: HashMap<String, Option<Vec<u64>>> = results
let map: HashMap<String, Option<Vec<u64>>> = results
.into_iter()
.map(|(k, v)| {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
},
};
use std::{
cmp,
collections::{HashMap, HashSet},
};

Expand Down Expand Up @@ -45,7 +44,6 @@ where
let ctx: Context<G, ComputeStateVec> = graph.into();
let step1 = ATask::new(move |vv: &mut EvalVertexView<'_, G, _, _>| {
let mut out_components = HashSet::new();
let id = vv.id();
let mut to_check_stack = Vec::new();
vv.out_neighbours().id().for_each(|id| {
out_components.insert(id);
Expand Down Expand Up @@ -130,7 +128,7 @@ mod components_test {
correct.insert("6".to_string(), Some(vec![]));
correct.insert("7".to_string(), Some(vec![]));
correct.insert("8".to_string(), Some(vec![]));
let mut map: HashMap<String, Option<Vec<u64>>> = results
let map: HashMap<String, Option<Vec<u64>>> = results
.into_iter()
.map(|(k, v)| {
(
Expand Down
127 changes: 114 additions & 13 deletions raphtory/src/db/graph/views/deletion_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::{
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GraphWithDeletions {
graph: Arc<InternalGraph>,
hard_deletions:bool
}

impl Static for GraphWithDeletions {}
Expand All @@ -45,6 +46,7 @@ impl From<InternalGraph> for GraphWithDeletions {
fn from(value: InternalGraph) -> Self {
Self {
graph: Arc::new(value),
hard_deletions: false
}
}
}
Expand All @@ -62,7 +64,79 @@ impl Display for GraphWithDeletions {
}

impl GraphWithDeletions {
fn edge_alive_at(&self, e: &EdgeStore, t: i64, layer_ids: &LayerIds) -> bool {

pub fn set_hard_deletions(&self) -> GraphWithDeletions {
Self {
graph: self.graph.clone(),
hard_deletions: true
}
}

pub fn set_soft_deletions(&self) -> GraphWithDeletions {
Self {
graph: self.graph.clone(),
hard_deletions: false
}
}

fn edge_alive_at(&self, e: &EdgeStore, start: i64,end: i64, layer_ids: &LayerIds) -> bool {
if self.hard_deletions {
self.hard_edge_alive_at(e,end,layer_ids)
}
else {
self.soft_edge_alive_at(e,start,layer_ids)
}
}

//function for semantics where an edge is only included if the last update was an addition
fn hard_edge_alive_at(&self, e: &EdgeStore, t: i64, layer_ids: &LayerIds) -> bool {
// FIXME: assumes additions are before deletions if at the same timestamp (need to have strict ordering/secondary index)
let (
latest_addition,
latest_deletion
) = match layer_ids {
LayerIds::None => return false,
LayerIds::All => (
e.additions()
.iter()
.flat_map(|v| v.range(i64::MIN..t.saturating_add(1)).last().copied())
.max(),
e.deletions()
.iter()
.flat_map(|v| v.range(i64::MIN..t).last().copied())
.max(),
),
LayerIds::One(l_id) => (
e.additions()
.get(*l_id)
.and_then(|v| v.range(i64::MIN..t.saturating_add(1)).last().copied()),
e.deletions()
.get(*l_id)
.and_then(|v| v.range(i64::MIN..t).last().copied()),
),
LayerIds::Multiple(ids) => (
ids.iter()
.flat_map(|l_id| {
e.additions()
.get(*l_id)
.and_then(|v| v.range(i64::MIN..t.saturating_add(1)).last().copied())
})
.max(),
ids.iter()
.flat_map(|l_id| {
e.deletions()
.get(*l_id)
.and_then(|v| v.range(i64::MIN..t).last().copied())
})
.max(),
),
};
// None is less than any value (see test below)
latest_deletion<latest_addition
}

//function for 'softer' semantics where an edge is included in a window even if it was deleted part way through it
fn soft_edge_alive_at(&self, e: &EdgeStore, t: i64, layer_ids: &LayerIds) -> bool {
// FIXME: assumes additions are before deletions if at the same timestamp (need to have strict ordering/secondary index)
let (
first_addition,
Expand Down Expand Up @@ -130,7 +204,8 @@ impl GraphWithDeletions {
fn vertex_alive_at(
&self,
v: &VertexStore,
t: i64,
start: i64,
end: i64,
layers: &LayerIds,
edge_filter: Option<&EdgeFilter>,
) -> bool {
Expand All @@ -139,14 +214,15 @@ impl GraphWithDeletions {
.map(|eref| edges.get(eref.pid().into()))
.find(|e| {
edge_filter.map(|f| f(e, layers)).unwrap_or(true)
&& self.edge_alive_at(e, t, layers)
&& self.edge_alive_at(e, start,end, layers)
})
.is_some()
}

pub fn new() -> Self {
Self {
graph: Arc::new(InternalGraph::default()),
hard_deletions:false
}
}

Expand Down Expand Up @@ -213,6 +289,7 @@ impl InternalMaterialize for GraphWithDeletions {
fn new_base_graph(&self, graph: InternalGraph) -> MaterializedGraph {
MaterializedGraph::PersistentGraph(GraphWithDeletions {
graph: Arc::new(graph),
hard_deletions:false,
})
}

Expand Down Expand Up @@ -272,12 +349,19 @@ impl TimeSemantics for GraphWithDeletions {
edge_filter: Option<&EdgeFilter>,
) -> bool {
let v = self.graph.inner().storage.get_node(v);
v.active(w.clone()) || self.vertex_alive_at(&v, w.start, layer_ids, edge_filter)
v.active(w.clone()) || self.vertex_alive_at(&v, w.start,w.end, layer_ids, edge_filter)
}

fn include_edge_window(&self, e: &EdgeStore, w: Range<i64>, layer_ids: &LayerIds) -> bool {
// includes edge if it is alive at the start of the window or added during the window
e.active(layer_ids, w.clone()) || self.edge_alive_at(e, w.start, layer_ids)
if self.hard_deletions {
//includes edge if alive at the end of the window
self.edge_alive_at(e, w.start,w.end, layer_ids)
}
else{
//soft deletions
// includes edge if it is alive at the start of the window or added during the window
e.active(layer_ids, w.clone()) || self.edge_alive_at(e, w.start,w.end, layer_ids)
}
}

fn vertex_history(&self, v: VID) -> Vec<i64> {
Expand All @@ -290,7 +374,7 @@ impl TimeSemantics for GraphWithDeletions {

fn edge_exploded(&self, e: EdgeRef, layer_ids: LayerIds) -> BoxedIter<EdgeRef> {
//Fixme: Need support for duration on exploded edges
if self.edge_alive_at(&self.core_edge(e.pid()), i64::MIN, &layer_ids) {
if self.edge_alive_at(&self.core_edge(e.pid()), i64::MIN,i64::MIN, &layer_ids) {
Box::new(
iter::once(e.at(i64::MIN.into())).chain(self.graph.edge_window_exploded(
e,
Expand All @@ -315,7 +399,7 @@ impl TimeSemantics for GraphWithDeletions {
) -> BoxedIter<EdgeRef> {
// FIXME: Need better iterators on LockedView that capture the guard
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, w.start, &layer_ids) {
if self.edge_alive_at(&entry, w.start,w.end, &layer_ids) {
Box::new(
iter::once(e.at(w.start.into())).chain(self.graph.edge_window_exploded(
e,
Expand Down Expand Up @@ -352,7 +436,7 @@ impl TimeSemantics for GraphWithDeletions {
fn edge_earliest_time(&self, e: EdgeRef, layer_ids: LayerIds) -> Option<i64> {
e.time().map(|ti| *ti.t()).or_else(|| {
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, i64::MIN, &layer_ids) {
if self.edge_alive_at(&entry, i64::MIN,i64::MIN, &layer_ids) {
Some(i64::MIN)
} else {
self.edge_additions(e, layer_ids).first().map(|ti| *ti.t())
Expand All @@ -367,7 +451,7 @@ impl TimeSemantics for GraphWithDeletions {
layer_ids: LayerIds,
) -> Option<i64> {
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, w.start, &layer_ids) {
if self.edge_alive_at(&entry, w.start,w.end, &layer_ids) {
Some(w.start)
} else {
self.edge_additions(e, layer_ids).range(w).first_t()
Expand All @@ -388,7 +472,7 @@ impl TimeSemantics for GraphWithDeletions {
)),
None => {
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, i64::MAX, &layer_ids) {
if self.edge_alive_at(&entry, i64::MAX, i64::MIN, &layer_ids) {
Some(i64::MAX)
} else {
self.edge_deletions(e, layer_ids).last_t()
Expand Down Expand Up @@ -416,7 +500,7 @@ impl TimeSemantics for GraphWithDeletions {
)),
None => {
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, w.end - 1, &layer_ids) {
if self.edge_alive_at(&entry, w.end - 1,w.end - 1, &layer_ids) {
Some(w.end - 1)
} else {
self.edge_deletions(e, layer_ids).range(w).last_t()
Expand Down Expand Up @@ -537,7 +621,7 @@ impl TimeSemantics for GraphWithDeletions {
match prop {
Some(p) => {
let entry = self.core_edge(e.pid());
if self.edge_alive_at(&entry, start, &layer_ids) {
if self.edge_alive_at(&entry, start, end, &layer_ids) {
p.last_before(start.saturating_add(1))
.into_iter()
.map(|(_, v)| (start, v))
Expand Down Expand Up @@ -691,4 +775,21 @@ mod test_deletions {
assert!(g.window(2, 3).has_edge(3, 4, Layer::Default));
assert!(!g.window(3, 4).has_edge(3, 4, Layer::Default));
}

#[test]
fn test_hard_deletions() {
let g = GraphWithDeletions::new();
g.add_edge(1, 1, 2, [("test", "test")], None).unwrap();
g.delete_edge(10, 1, 2, None).unwrap();

assert_eq!(g.count_edges(),1);
let g2 = g.set_hard_deletions();
assert_eq!(g2.at(12).count_edges(), 0);
assert_eq!(g2.at(11).count_edges(), 0);
assert_eq!(g2.at(10).count_edges(), 0);
assert_eq!(g2.at(9).count_edges(), 1);
assert_eq!(g2.window(5,9).count_edges(), 1);
assert_eq!(g2.window(5,10).count_edges(), 1);
assert_eq!(g2.window(5,11).count_edges(), 0);
}
}
8 changes: 8 additions & 0 deletions raphtory/src/python/graph/graph_with_deletions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ impl PyGraphWithDeletions {
)
}


pub fn soft_deletions(&self,py:Python) -> PyObject {
self.graph.set_soft_deletions().into_py(py)
}

pub fn hard_deletions(&self,py:Python) -> PyObject {
self.graph.set_hard_deletions().into_py(py)
}
/// Adds a new vertex with the given id and properties to the graph.
///
/// Arguments:
Expand Down

0 comments on commit 40edfc4

Please sign in to comment.