Skip to content

Commit

Permalink
Passing all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Nov 15, 2024
1 parent b7a6c28 commit 3b60e94
Show file tree
Hide file tree
Showing 2 changed files with 2,798 additions and 273 deletions.
83 changes: 42 additions & 41 deletions src/core/operator/task/iterative_length_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,62 +62,63 @@ bool PhysicalIterativeTask::SetTaskRange() {
vector<int64_t> &e = state.csr->e;
auto &change = bfs_state->change;

if (!SetTaskRange()) {
return;
}
// Attempt to get a task range
bool has_tasks = SetTaskRange();
std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;

// clear next before each iteration
// Clear `next` array regardless of task availability
for (auto i = left; i < right; i++) {
next[i] = 0;
next[i] = 0;
}

// Synchronize after clearing
barrier->Wait();

while (true) {
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
next[n] |= visit[i];
}
// Main processing loop
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
{
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
next[n] |= visit[i];
}
}
}
}
}
if (!SetTaskRange()) {
break; // no more tasks
}
has_tasks = SetTaskRange();
}
change = false;
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});



// Synchronize at the end of the main processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();

if (!SetTaskRange()) {
return; // no more tasks
}
while (true) {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
// Check and process tasks for the next phase
has_tasks = SetTaskRange();
change = false;

while (has_tasks) {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
}
}
}
if (!SetTaskRange()) {
break; // no more tasks
}
has_tasks = SetTaskRange();
}
barrier->Wait([&]() {
bfs_state->ResetTaskIndex(); // Reset task index safely
});

// Final synchronization after processing
barrier->Wait([&]() {
std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
bfs_state->ResetTaskIndex();
});
barrier->Wait();

}
}

void PhysicalIterativeTask::ReachDetect() const {
auto &bfs_state = state.global_bfs_state;
Expand Down
Loading

0 comments on commit 3b60e94

Please sign in to comment.