Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Motifs tidy ii #1623

Merged
merged 17 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions raphtory/src/algorithms/motifs/global_temporal_three_node_motifs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ where
.collect();
let events = evv
.edges()
.explode()
.iter()
.sorted_by_key(|e| e.time_and_index())
.map(|e| e.explode())
.kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index())
.map(|edge| {
if edge.src().id() == evv.id() {
star_event(neigh_map[&edge.dst().id()], 1, edge.time().unwrap())
Expand Down Expand Up @@ -84,8 +84,9 @@ where
let events: Vec<TwoNodeEvent> = out
.iter()
.flat_map(|e| e.explode())
.chain(inc.iter().flat_map(|e| e.explode()))
.sorted_by_key(|e| e.time_and_index())
.merge_by(inc.iter().flat_map(|e| e.explode()), |e1, e2| {
e1.time_and_index() < e2.time_and_index()
})
.map(|e| {
two_node_event(
if e.src().id() == evv.id() { 1 } else { 0 },
Expand All @@ -111,10 +112,13 @@ pub fn triangle_motifs<G>(graph: &G, deltas: Vec<i64>, threads: Option<usize>) -
where
G: StaticGraphViewOps,
{
// Create K-Core graph to recursively remove nodes of degree < 2
let node_set = k_core_set(graph, 2, usize::MAX, None);
let g: NodeSubgraph<G> = graph.subgraph(node_set);
let mut ctx_sub: Context<NodeSubgraph<G>, ComputeStateVec> = Context::from(&g);
let kcore_subgraph: NodeSubgraph<G> = graph.subgraph(node_set);
let mut ctx_subgraph: Context<NodeSubgraph<G>, ComputeStateVec> =
Context::from(&kcore_subgraph);

// Triangle Accumulator
let neighbours_set = accumulators::hash_set::<u64>(0);

let tri_mc = deltas
Expand All @@ -124,14 +128,15 @@ where
let tri_clone = tri_mc.clone();

tri_mc.clone().iter().for_each(|mc| {
ctx_sub.global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst<usize, SumDef<usize>, 8>>(
*mc,
)
ctx_subgraph
.global_agg::<[usize; 8], [usize; 8], [usize; 8], ArrConst<usize, SumDef<usize>, 8>>(
*mc,
)
});

ctx_sub.agg(neighbours_set);
ctx_subgraph.agg(neighbours_set);

let step1 = ATask::new(move |u: &mut EvalNodeView<NodeSubgraph<G>, ()>| {
let neighbourhood_update_step = ATask::new(move |u: &mut EvalNodeView<NodeSubgraph<G>, ()>| {
for v in u.neighbours() {
if u.id() > v.id() {
v.update(&neighbours_set, u.id());
Expand All @@ -140,7 +145,7 @@ where
Step::Continue
});

let step2 = ATask::new(move |u: &mut EvalNodeView<NodeSubgraph<G>, ()>| {
let intersection_compute_step = ATask::new(move |u: &mut EvalNodeView<NodeSubgraph<G>, ()>| {
for v in u.neighbours() {
// Find triangles on the UV edge
if u.id() > v.id() {
Expand Down Expand Up @@ -172,14 +177,14 @@ where
.into_iter()
.sorted()
.permutations(2)
.flat_map(|e| {
.map(|e| {
u.graph()
.edge(*e.first().unwrap(), *e.get(1).unwrap())
.iter()
.flat_map(|edge| edge.explode())
.collect::<Vec<_>>()
})
.sorted_by_key(|e| e.time_and_index())
.kmerge_by(|e1, e2| e1.time_and_index() < e2.time_and_index())
.map(|e| {
let (src_id, dst_id) = (e.src().id(), e.dst().id());
let uid = u.id();
Expand Down Expand Up @@ -219,11 +224,11 @@ where
Step::Continue
});

let mut runner: TaskRunner<NodeSubgraph<G>, _> = TaskRunner::new(ctx_sub);
let mut runner: TaskRunner<NodeSubgraph<G>, _> = TaskRunner::new(ctx_subgraph);

runner.run(
vec![Job::new(step1)],
vec![Job::new(step2)],
vec![Job::new(neighbourhood_update_step)],
vec![Job::new(intersection_compute_step)],
None,
|egs, _, _, _| {
tri_mc.iter().map(|mc| egs.finalize::<[usize; 8], [usize;8], [usize; 8], ArrConst<usize,SumDef<usize>,8>>(mc)).collect_vec()
Expand Down Expand Up @@ -252,12 +257,11 @@ where
.collect_vec();

let star_clone = star_mc.clone();

star_mc.iter().for_each(|mc| ctx.global_agg(*mc));

let out1 = triangle_motifs(g, deltas.clone(), threads);
let triadic_motifs = triangle_motifs(g, deltas.clone(), threads);

let step1 = ATask::new(move |evv: &mut EvalNodeView<G, _>| {
let star_count_step = ATask::new(move |evv: &mut EvalNodeView<G, _>| {
let star_nodes = star_motif_count(evv, deltas.clone());
for (i, star) in star_nodes.iter().enumerate() {
evv.global_update(&star_mc[i], *star);
Expand All @@ -266,14 +270,12 @@ where
});

let mut runner: TaskRunner<G, _> = TaskRunner::new(ctx);
// let star_ref = &star_mc;

runner.run(
vec![],
vec![Job::new(step1)],
vec![Job::new(star_count_step)],
None,
|egs, _ , _ , _ | {
out1.iter().enumerate().map(|(i,tri)| {
triadic_motifs.iter().enumerate().map(|(i,tri)| {
let mut tmp = egs.finalize::<[usize; 32], [usize;32], [usize; 32], ArrConst<usize,SumDef<usize>,32>>(&star_clone[i])
.iter().copied()
.collect_vec();
Expand Down
Loading
Loading