Skip to content

Commit

Permalink
Avoid the early return and barrier desynchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 15, 2024
1 parent 8ff2bab commit b7a6c28
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 179 deletions.
198 changes: 90 additions & 108 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,43 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
auto &bfs_state = state.global_bfs_state;
auto &barrier = bfs_state->barrier;

do {
// std::cout << worker_id << ": Starting iterative path" << std::endl;
IterativePath();
// std::cout << worker_id << ": Finished iterative path" << std::endl;
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});

barrier->Wait();
do {
IterativePath();

if (worker_id == 0) {
// std::cout << worker_id << ": Starting reach detected" << std::endl;
ReachDetect();
// std::cout << worker_id << ": Finished reach detected" << std::endl;
}
// Synchronize after IterativePath
barrier->Wait();
if (worker_id == 0) {
bfs_state->ResetTaskIndex();
}
barrier->Wait();

barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});
if (worker_id == 0) {
ReachDetect();
}

barrier->Wait();
} while (bfs_state->change);
std::cout << "Worker " << worker_id << ": Waiting at barrier before ResetTaskIndex." << std::endl;
barrier->Wait();
if (worker_id == 0) {
// std::cout << worker_id << ": Starting path construction" << std::endl;
PathConstruction();
// std::cout << worker_id << ": Finished path construction" << std::endl;
bfs_state->ResetTaskIndex();
std::cout << "Worker " << worker_id << ": ResetTaskIndex completed." << std::endl;
}
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed barrier after ResetTaskIndex." << std::endl;
} while (bfs_state->change);

barrier->Wait();
if (worker_id == 0) {
std::cout << "Worker " << worker_id << " started path construction" << std::endl;
PathConstruction();
std::cout << "Worker " << worker_id << " finished path construction" << std::endl;
}

// Final synchronization before finishing
barrier->Wait();
std::cout << "Worker " << worker_id << " finishing task" << std::endl;
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;

event->FinishTask();
// std::cout << worker_id << ": Finished event" << std::endl;
return TaskExecutionResult::TASK_FINISHED;
}

bool PhysicalShortestPathTask::SetTaskRange() {
Expand All @@ -70,99 +76,75 @@ PhysicalShortestPathTask::PhysicalShortestPathTask(shared_ptr<Event> event_p, Cl
auto &parents_ve = bfs_state->parents_ve;
auto &change = bfs_state->change;

// Attempt to get a task range
bool has_tasks = SetTaskRange();
std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;


if (!SetTaskRange()) {
std::cout << "Worker " << worker_id << ": No more tasks at start." << std::endl;
return; // no more tasks
}
std::cout << "Worker " << worker_id << ": Clearing next array." << std::endl;
for (auto i = left; i < right; i++) {
next[i] = 0;
}
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed first barrier." << std::endl;

while (true) {
// std::cout << "Worker " << worker_id << ": Processing range [" << left << ", " << right << "]." << std::endl;
// Clear next array regardless of whether the worker has tasks
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
auto edge_id = edge_ids[offset];
{
auto start_time = std::chrono::high_resolution_clock::now();
{
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
}
auto end_time = std::chrono::high_resolution_clock::now();
std::cout << "Worker " << worker_id << ": Lock for element " << n
<< " held for " << std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count()
<< " microseconds." << std::endl;
next[n] |= visit[i];
}
for (auto l = 0; l < LANE_LIMIT; l++) {
if (parents_ve[n][l].GetV() == -1 && visit[i][l]) {
parents_ve[n][l] = {static_cast<int64_t>(i), edge_id};
}
}
}
}
next[i] = 0;
}
if (!SetTaskRange()) {
std::cout << "Worker " << worker_id << ": No more tasks found to explore." << std::endl;
break; // no more tasks
}
}
std::cout << "Worker " << worker_id << ": Waiting at second barrier." << std::endl;
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
bfs_state->ResetTaskIndex(); // Reset task index safely
});

barrier->Wait();
// std::cout << "Worker " << worker_id << ": Passed second barrier." << std::endl;

if (!SetTaskRange()) {
// Synchronize after clearing
barrier->Wait();
std::cout << "Worker " << worker_id << ": No more tasks at start." << std::endl;
return; // no more tasks
}

change = false;

do {
// std::cout << "Worker " << worker_id << ": Starting task range [" << left << ", " << right << "]." << std::endl;
for (auto i = left; i < right; i++) {
if (next[i].any()) {
// std::cout << "Worker " << worker_id << ": Processing node " << i << ", next[i]: "
// << next[i] << ", seen[i]: " << seen[i] << std::endl;

// Update `next` and `seen`
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
std::cout << "Worker " << worker_id << ": Passed first barrier." << std::endl;

// Main processing loop
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
auto edge_id = edge_ids[offset];
{
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
next[n] |= visit[i];
}
for (auto l = 0; l < LANE_LIMIT; l++) {
if (parents_ve[n][l].GetV() == -1 && visit[i][l]) {
parents_ve[n][l] = {static_cast<int64_t>(i), edge_id};
}
}
}
}
}

// std::cout << "Worker " << worker_id << ": Updated node " << i
// << ", prev_next: " << prev_next << ", next[i]: " << next[i]
// << ", seen[i]: " << seen[i] << ", change: " << change << std::endl;
}
// Check for a new task range
has_tasks = SetTaskRange();
if (!has_tasks) {
std::cout << "Worker " << worker_id << ": No more tasks found to explore." << std::endl;
}
}
} while (SetTaskRange());

// std::cout << "Worker " << worker_id << ": Completed tasks, waiting at barrier to reset task index." << std::endl;

barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index at barrier." << std::endl;
bfs_state->ResetTaskIndex(); // Reset task index safely
});

std::cout << "Worker " << worker_id << ": Waiting at third barrier." << std::endl;
// Synchronize at the end of the main processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();

barrier->Wait();
// Second processing stage (if needed)
has_tasks = SetTaskRange();
change = false;
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
}
}
has_tasks = SetTaskRange();
}

std::cout << "Worker " << worker_id << ": Passed third barrier." << std::endl;
}
// Synchronize again
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();
std::cout << "Worker " << worker_id << ": Passed second barrier." << std::endl;
}

void PhysicalShortestPathTask::ReachDetect() {
auto &bfs_state = state.global_bfs_state;
Expand Down
Loading

0 comments on commit b7a6c28

Please sign in to comment.