diff --git a/src/core/operator/task/shortest_path_task.cpp b/src/core/operator/task/shortest_path_task.cpp index 24877aa..e1af8fa 100644 --- a/src/core/operator/task/shortest_path_task.cpp +++ b/src/core/operator/task/shortest_path_task.cpp @@ -49,7 +49,10 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { barrier->Wait(); if (worker_id == 0) { state->Clear(); + // std::cout << "Started searches: " << state->started_searches << std::endl; + // std::cout << "Number of pairs: " << state->pairs->size() << std::endl; } + barrier->Wait(); } event->FinishTask(); @@ -77,14 +80,22 @@ void ShortestPathTask::IterativePath() { auto &parents_ve = state->parents_ve; auto &change = state->change; - // Attempt to get a task range bool has_tasks = SetTaskRange(); - for (auto i = left; i < right; i++) { - next[i] = 0; + + // Attempt to get a task range + while (has_tasks) { + for (auto i = left; i < right; i++) { + next[i].reset(); + } + has_tasks = SetTaskRange(); } + barrier->Wait([&]() { state->ResetTaskIndex(); }); + // Synchronize after clearing barrier->Wait(); + + has_tasks = SetTaskRange(); // Main processing loop while (has_tasks) { @@ -98,12 +109,19 @@ void ShortestPathTask::IterativePath() { next[n] |= visit[i]; } for (auto l = 0; l < LANE_LIMIT; l++) { - // Create the mask: true (-1 in all bits) if condition is met, else 0 - uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l]) ? ~uint64_t(0) : 0; + // Create the mask: true (-1 in all bits) if condition is met, else + // 0 + uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l]) + ? ~uint64_t(0) + : 0; - // Use the mask to conditionally update the `value` field of the `ve` struct - uint64_t new_value = (static_cast(i) << parents_ve[n][l].e_bits) | (edge_id & parents_ve[n][l].e_mask); - parents_ve[n][l].value = (mask & new_value) | (~mask & parents_ve[n][l].value); + // Use the mask to conditionally update the `value` field of the + // `ve` struct + uint64_t new_value = + (static_cast(i) << parents_ve[n][l].e_bits) | + (edge_id & parents_ve[n][l].e_mask); + parents_ve[n][l].value = + (mask & new_value) | (~mask & parents_ve[n][l].value); } } } @@ -114,55 +132,64 @@ void ShortestPathTask::IterativePath() { } // Synchronize at the end of the main processing - barrier->Wait([&]() { - state->ResetTaskIndex(); - }); + barrier->Wait([&]() { state->ResetTaskIndex(); }); barrier->Wait(); // Second processing stage (if needed) has_tasks = SetTaskRange(); change = false; + barrier->Wait(); while (has_tasks) { for (auto i = left; i < right; i++) { if (next[i].any()) { next[i] &= ~seen[i]; seen[i] |= next[i]; - change |= next[i].any(); + if (next[i].any()) { + std::lock_guard lock(state->change_lock); + change = true; + } } } has_tasks = SetTaskRange(); } + // std::cout << "Worker: " << worker_id << " Iteration: " << state->iter << " Change: " << change << std::endl; // Synchronize again - barrier->Wait([&]() { - state->ResetTaskIndex(); - }); + barrier->Wait([&]() { state->ResetTaskIndex(); }); barrier->Wait(); } void ShortestPathTask::ReachDetect() { // detect lanes that finished + // std::cout << "Got in reach detect" << std::endl; for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = state->lane_to_num[lane]; - if (search_num >= 0 && !state->lane_completed.test(lane)) { // Active lane that has not yet completed + if (search_num >= 0 && + !state->lane_completed.test( + lane)) { // Active lane that has not yet completed //! Check if dst for a source has been seen int64_t dst_pos = state->vdata_dst.sel->get_index(search_num); if (state->seen[state->dst[dst_pos]][lane]) { - state->active--; // Decrement active count - state->lane_completed.set(lane); // Mark this lane as completed + state->active--; // Decrement active count + state->lane_completed.set(lane); // Mark this lane as completed } } } + // std::cout << "Active lanes: " << state->active << std::endl; if (state->active == 0) { state->change = false; } // into the next iteration state->iter++; + // std::cout << "Change: " << state->change << std::endl; + // std::cout << "Incremented iteration counter" << std::endl; } void ShortestPathTask::PathConstruction() { - auto result_data = FlatVector::GetData(state->pf_results->data[0]); + auto result_data = + FlatVector::GetData(state->pf_results->data[0]); auto &result_validity = FlatVector::Validity(state->pf_results->data[0]); + // std::cout << "Iterations: " << state->iter << std::endl; //! Reconstruct the paths for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = state->lane_to_num[lane]; @@ -173,11 +200,15 @@ void ShortestPathTask::PathConstruction() { //! Searches that have stopped have found a path int64_t src_pos = state->vdata_src.sel->get_index(search_num); int64_t dst_pos = state->vdata_dst.sel->get_index(search_num); + + // std::cout << state->src[src_pos] << " " << state->dst[dst_pos] << std::endl; + if (state->src[src_pos] == state->dst[dst_pos]) { // Source == destination unique_ptr output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); ListVector::PushBack(*output, state->src[src_pos]); - ListVector::Append(state->pf_results->data[0], ListVector::GetEntry(*output), + ListVector::Append(state->pf_results->data[0], + ListVector::GetEntry(*output), ListVector::GetListSize(*output)); result_data[search_num].length = ListVector::GetListSize(*output); result_data[search_num].offset = state->current_batch_path_list_len; @@ -220,8 +251,8 @@ void ShortestPathTask::PathConstruction() { result_data[search_num].length = list_len; result_data[search_num].offset = state->current_batch_path_list_len; - ListVector::Append(state->pf_results->data[0], ListVector::GetEntry(*output), - list_len); + ListVector::Append(state->pf_results->data[0], + ListVector::GetEntry(*output), list_len); state->current_batch_path_list_len += list_len; } } diff --git a/src/include/duckpgq/core/operator/bfs_state.hpp b/src/include/duckpgq/core/operator/bfs_state.hpp index acb7d49..b7146a3 100644 --- a/src/include/duckpgq/core/operator/bfs_state.hpp +++ b/src/include/duckpgq/core/operator/bfs_state.hpp @@ -73,6 +73,7 @@ class BFSState : public enable_shared_from_this { // lock for next mutable vector element_locks; + mutex change_lock; }; } // namespace core