Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Rust APSP benchmark #53

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions Rust/Savina/src/parallelism/Apsp.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/**
* Copyright (C) 2020 TU Dresden
*
* This benchmark implements a parallel all pairs shortest path algorithm. In
* order to split the workload, the large input matrix of size graph_size x
* graph_size is split into smaller blocks of size block_size x block_size. Each of
* the worker reactors (ApspFloydWarshallBlock) processes one of these blocks.
* The worker reactors are organized in the same matrix pattern, replication the
* structure of the blocks within the large input matrix. Each of the workers
* operates on its local block data, and sends results to all other workers in
* the same column or in the same row. The data from the neighbors is then used
* to compute the next intermediate result and to update the local state
* accordingly.
*
* @author Christian Menard
* @author Hannes Klein
* @author Johannes Hayeß
*/

target Rust {
build-type : Release,
cargo-features: [ "cli" ],
rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"],
};

import BenchmarkRunner from "../lib/BenchmarkRunner.lf";

reactor ApspFloydWarshallBlock(
bank_index: usize(0),
row_index: usize(0),
graph_size: usize(300),
block_size: usize(50),
dimension: usize(6)
) {

state bank_index(bank_index);
state row_index(row_index);
state graph_size(graph_size);
state block_size(block_size);
state dimension(dimension);

state num_neighbors: usize({=2 * (dimension - 1)=});
state row_offset: usize({=row_index * block_size=}); // row offset of the block of this reactor
state col_offset: usize({=bank_index * block_size=}); // column offset of the block of this reactor

state k: usize(0); // iteration counter
state reportedFinish: bool(false);

input start: Matrix<u64>;

input[dimension] frow_row: Matrix<u64>;
input[dimension] frow_col: Matrix<u64>;

output toNeighbors: Matrix<u64>;
output finished: unit;

logical action notify_neighbors: Matrix<u64>;

preamble {=
use crate::matrix::Matrix;

fn get_element_at(
row: usize,
col: usize,
row_ports: &Multiport<Matrix<u64>>,
col_ports: &Multiport<Matrix<u64>>,
ctx: &ReactionCtx,
block_size: usize,
row_index: usize,
bank_index: usize,
) -> u64 {
let dest_row = row / block_size;
let dest_col = col / block_size;
let local_row = row % block_size;
let local_col = col % block_size;

if dest_row == row_index {
*ctx.get_ref(&row_ports[dest_col])
.unwrap()
.get(local_row, local_col)
} else if dest_col == bank_index {
*ctx.get_ref(&col_ports[dest_row])
.unwrap()
.get(local_row, local_col)
} else {
panic!("Error: unexpected target location ({},{})", dest_col, dest_row);
}

}
=}

// @label block_start
reaction(start) -> notify_neighbors {=
// reset local state
self.k = 0;
self.reportedFinish = false;

// start execution
let matrix = ctx.get_ref(start).unwrap().clone();
ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap);
=}

reaction(notify_neighbors) -> toNeighbors {=
//notify all neighbors
ctx.set(toNeighbors, ctx.use_ref_opt(notify_neighbors, Clone::clone).unwrap());
=}

reaction(frow_row, frow_col) -> notify_neighbors, finished {=
// do nothing if complete
if self.k == self.graph_size {
return;
}

// perform computation
let mut matrix: Matrix<u64> = Matrix::new(self.block_size, self.block_size);
let bs = self.block_size;
let ri = self.row_index;
let bi = self.bank_index;

for i in 0..self.block_size {
for j in 0..self.block_size {
let gi = self.row_offset + i;
let gj = self.col_offset + j;

let result = get_element_at(gi, self.k, frow_row, frow_col, &ctx, bs, ri, bi)
+ get_element_at(self.k, gj, frow_row, frow_col, &ctx, bs, ri, bi);
matrix.set(i, j, result.min(get_element_at(gi, gj, frow_row, frow_col, &ctx, bs, ri, bi)));
}
}

// increment iteration count
self.k += 1;

if self.k == self.graph_size {
if self.bank_index == 0 && self.row_index == 0 {
debug!("{}", matrix);
}
ctx.set(finished, ());
}

// send the result to all neighbors in the next iteration
ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap);
=}
}

reactor ApspRow(
bank_index: usize(0),
block_size: usize(50),
num_nodes: usize(300),
dimension: usize(6),
dimension_sq: usize(36)
) {
preamble {=
use crate::matrix::Matrix;
=}

input start: Matrix<u64>;
output[dimension] finished: unit;

input[dimension_sq] frow_col: Matrix<u64>;
output[dimension] to_col: Matrix<u64>;

blocks = new[dimension] ApspFloydWarshallBlock(
row_index=bank_index,
block_size=block_size,
graph_size=num_nodes,
dimension=dimension
);

// connect all blocks within the row
(blocks.toNeighbors)+ -> blocks.frow_row;

// block output to all column neighbours
blocks.toNeighbors -> to_col;
// block input from all column neighbours
frow_col -> interleaved(blocks.frow_col);

// broadcast the incoming matrix to all blocks
(start)+ -> blocks.start;
// collect and forward finished signals from all blocks
blocks.finished -> finished;
}

reactor ApspMatrix(
block_size: usize(50),
num_nodes: usize(300),
dimension: usize(6),
dimension_sq: usize(36)
) {
preamble {=
use crate::matrix::Matrix;
=}
input start: Matrix<u64>;
output[dimension_sq] finished: unit;

rows = new[dimension] ApspRow(block_size=block_size, num_nodes=num_nodes, dimension=dimension, dimension_sq=dimension_sq);

// broadcast the incoming matrix to all rows
(start)+ -> rows.start;
// collect and forward finished signals from all blocks
rows.finished -> finished;

(rows.to_col)+ -> rows.frow_col;
}

main reactor (
num_iterations: usize(12),
max_edge_weight: usize(100),
block_size: usize(50),
num_nodes: usize(300)
) {
state num_iterations(num_iterations);
state max_edge_weight(max_edge_weight);
state block_size(block_size);
state num_nodes(num_nodes);

state num_blocks_finished: usize(0);

runner = new BenchmarkRunner(num_iterations=num_iterations);
matrix = new ApspMatrix(
block_size=block_size,
num_nodes=num_nodes,
dimension={=num_nodes / block_size=},
dimension_sq={=(num_nodes / block_size)*(num_nodes / block_size)=}
);

reaction(startup) {=
print_benchmark_info("ApspBenchmark");
print_args!(
"num_iterations",
self.num_iterations,
"max_edge_weight",
self.max_edge_weight,
"num_nodes",
self.num_nodes,
"block_size",
self.block_size
);
print_system_info();
=}

// @label dostart
reaction(runner.start) -> matrix.start {=
// reset local state
self.num_blocks_finished = 0;
let graph_data = generate_graph(self.num_nodes, self.max_edge_weight);
// start execution
ctx.set(matrix__start, graph_data);
=}

reaction (matrix.finished) -> runner.finished {=
self.num_blocks_finished += matrix__finished.iterate_set().count();
let dimension = self.num_nodes / self.block_size;
if self.num_blocks_finished == dimension * dimension {
ctx.set(runner__finished, ());
}
=}

preamble {=
use crate::matrix::Matrix;
use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}};
use crate::pseudo_random::PseudoRandomGenerator;
use std::os::raw::c_long;

fn generate_graph(n: usize, w: usize) -> Matrix<u64> {
let mut random = PseudoRandomGenerator::from(n as c_long);
let mut local_data: Matrix<u64> = Matrix::new(n, n);

for i in 0..n {
for j in (i+1)..n {
let r = u64::from(random.next_in_range(0..w as c_long)) + 1;
local_data.set(i, j, r);
local_data.set(j, i, r);
}
}

local_data
}
=}
}
10 changes: 10 additions & 0 deletions runner/conf/benchmark/savina_parallelism_apsp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,13 @@ targets:
num_workers: ["-D", "numNodes=<value>"]
block_size: ["-D", "blockSize=<value>"]
max_edge_weight: ["-D", "maxEdgeWeight=<value>"]
lf-rust:
copy_sources:
- "${bench_path}/Rust/Savina/src/lib"
- "${bench_path}/Rust/Savina/src/parallelism"
lf_file: "parallelism/Apsp.lf"
binary: "apsp"
run_args:
block_size: ["--main-block-size", "<value>"]
max_edge_weight: ["--main-max-edge-weight", "<value>"]
num_workers: ["--main-num-nodes", "<value>"]