Skip to content

Commit

Permalink
Add a lane completed check to avoid decrementing active searches mult…
Browse files Browse the repository at this point in the history
…iple times if it terminated in an earlier iteration
  • Loading branch information
Dtenwolde committed Dec 18, 2024
1 parent 2dd4b48 commit 7c85657
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/core/operator/bfs_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void BFSState::Clear() {
}
}
}
lane_completed.reset();
}

void BFSState::CreateTasks() {
Expand Down
10 changes: 3 additions & 7 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ ShortestPathTask::ShortestPathTask(shared_ptr<Event> event_p,
TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) {
auto &barrier = state->barrier;
while (state->started_searches < state->pairs->size()) {
// std::cout << "Started searches: " << state->started_searches << std::endl;
barrier->Wait();
if (worker_id == 0) {
state->InitializeLanes();
// state->pairs->Print();
}
barrier->Wait();
do {
Expand Down Expand Up @@ -59,7 +57,6 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) {
}
barrier->Wait();
}
// std::cout << "Worker " << worker_id << " finishing task" << std::endl;
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}
Expand Down Expand Up @@ -90,7 +87,6 @@ void ShortestPathTask::IterativePath() {

// Main processing loop
while (has_tasks) {
// std::cout << worker_id << " " << left << " " << right << std::endl;

for (auto i = left; i < right; i++) {
if (visit[i].any()) {
Expand Down Expand Up @@ -148,11 +144,12 @@ void ShortestPathTask::ReachDetect() {
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = state->lane_to_num[lane];
if (search_num >= 0) { // active lane
if (search_num >= 0 && !state->lane_completed.test(lane)) { // Active lane that has not yet completed
//! Check if dst for a source has been seen
int64_t dst_pos = state->vdata_dst.sel->get_index(search_num);
if (state->seen[state->dst[dst_pos]][lane]) {
state->active--;
state->active--; // Decrement active count
state->lane_completed.set(lane); // Mark this lane as completed
}
}
}
Expand Down Expand Up @@ -193,7 +190,6 @@ void ShortestPathTask::PathConstruction() {

auto parent_vertex = state->parents_ve[state->dst[dst_pos]][lane].GetV();
auto parent_edge = state->parents_ve[state->dst[dst_pos]][lane].GetE();

output_vector.push_back(state->dst[dst_pos]); // Add destination vertex
output_vector.push_back(parent_edge);
while (parent_vertex != source_v) { // Continue adding vertices until we
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckpgq/core/operator/bfs_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class BFSState : public enable_shared_from_this<BFSState> {
vector<std::bitset<LANE_LIMIT>> visit2;
vector<std::array<ve, LANE_LIMIT>> parents_ve;

std::bitset<LANE_LIMIT> lane_completed;

idx_t total_pairs_processed;
idx_t num_threads;
idx_t scheduled_threads;
Expand Down

0 comments on commit 7c85657

Please sign in to comment.