Skip to content

Commit

Permalink
Adding a lock for updates to change, and add a barrier to avoid reset…
Browse files Browse the repository at this point in the history
…ting to false
  • Loading branch information
Dtenwolde committed Dec 18, 2024
1 parent b7b4754 commit 115b9ef
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 22 deletions.
75 changes: 53 additions & 22 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) {
barrier->Wait();
if (worker_id == 0) {
state->Clear();
// std::cout << "Started searches: " << state->started_searches << std::endl;
// std::cout << "Number of pairs: " << state->pairs->size() << std::endl;
}

barrier->Wait();
}
event->FinishTask();
Expand Down Expand Up @@ -77,14 +80,22 @@ void ShortestPathTask::IterativePath() {
auto &parents_ve = state->parents_ve;
auto &change = state->change;

// Attempt to get a task range
bool has_tasks = SetTaskRange();
for (auto i = left; i < right; i++) {
next[i] = 0;

// Attempt to get a task range
while (has_tasks) {
for (auto i = left; i < right; i++) {
next[i].reset();
}
has_tasks = SetTaskRange();
}

barrier->Wait([&]() { state->ResetTaskIndex(); });

// Synchronize after clearing
barrier->Wait();

has_tasks = SetTaskRange();
// Main processing loop
while (has_tasks) {

Expand All @@ -98,12 +109,19 @@ void ShortestPathTask::IterativePath() {
next[n] |= visit[i];
}
for (auto l = 0; l < LANE_LIMIT; l++) {
// Create the mask: true (-1 in all bits) if condition is met, else 0
uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l]) ? ~uint64_t(0) : 0;
// Create the mask: true (-1 in all bits) if condition is met, else
// 0
uint64_t mask = ((parents_ve[n][l].GetV() == -1) && visit[i][l])
? ~uint64_t(0)
: 0;

// Use the mask to conditionally update the `value` field of the `ve` struct
uint64_t new_value = (static_cast<uint64_t>(i) << parents_ve[n][l].e_bits) | (edge_id & parents_ve[n][l].e_mask);
parents_ve[n][l].value = (mask & new_value) | (~mask & parents_ve[n][l].value);
// Use the mask to conditionally update the `value` field of the
// `ve` struct
uint64_t new_value =
(static_cast<uint64_t>(i) << parents_ve[n][l].e_bits) |
(edge_id & parents_ve[n][l].e_mask);
parents_ve[n][l].value =
(mask & new_value) | (~mask & parents_ve[n][l].value);
}
}
}
Expand All @@ -114,55 +132,64 @@ void ShortestPathTask::IterativePath() {
}

// Synchronize at the end of the main processing
barrier->Wait([&]() {
state->ResetTaskIndex();
});
barrier->Wait([&]() { state->ResetTaskIndex(); });
barrier->Wait();

// Second processing stage (if needed)
has_tasks = SetTaskRange();
change = false;
barrier->Wait();
while (has_tasks) {
for (auto i = left; i < right; i++) {
if (next[i].any()) {
next[i] &= ~seen[i];
seen[i] |= next[i];
change |= next[i].any();
if (next[i].any()) {
std::lock_guard<std::mutex> lock(state->change_lock);
change = true;
}
}
}
has_tasks = SetTaskRange();
}
// std::cout << "Worker: " << worker_id << " Iteration: " << state->iter << " Change: " << change << std::endl;

// Synchronize again
barrier->Wait([&]() {
state->ResetTaskIndex();
});
barrier->Wait([&]() { state->ResetTaskIndex(); });
barrier->Wait();
}

void ShortestPathTask::ReachDetect() {
// detect lanes that finished
// std::cout << "Got in reach detect" << std::endl;
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = state->lane_to_num[lane];
if (search_num >= 0 && !state->lane_completed.test(lane)) { // Active lane that has not yet completed
if (search_num >= 0 &&
!state->lane_completed.test(
lane)) { // Active lane that has not yet completed
//! Check if dst for a source has been seen
int64_t dst_pos = state->vdata_dst.sel->get_index(search_num);
if (state->seen[state->dst[dst_pos]][lane]) {
state->active--; // Decrement active count
state->lane_completed.set(lane); // Mark this lane as completed
state->active--; // Decrement active count
state->lane_completed.set(lane); // Mark this lane as completed
}
}
}
// std::cout << "Active lanes: " << state->active << std::endl;
if (state->active == 0) {
state->change = false;
}
// into the next iteration
state->iter++;
// std::cout << "Change: " << state->change << std::endl;
// std::cout << "Incremented iteration counter" << std::endl;
}

void ShortestPathTask::PathConstruction() {
auto result_data = FlatVector::GetData<list_entry_t>(state->pf_results->data[0]);
auto result_data =
FlatVector::GetData<list_entry_t>(state->pf_results->data[0]);
auto &result_validity = FlatVector::Validity(state->pf_results->data[0]);
// std::cout << "Iterations: " << state->iter << std::endl;
//! Reconstruct the paths
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = state->lane_to_num[lane];
Expand All @@ -173,11 +200,15 @@ void ShortestPathTask::PathConstruction() {
//! Searches that have stopped have found a path
int64_t src_pos = state->vdata_src.sel->get_index(search_num);
int64_t dst_pos = state->vdata_dst.sel->get_index(search_num);

// std::cout << state->src[src_pos] << " " << state->dst[dst_pos] << std::endl;

if (state->src[src_pos] == state->dst[dst_pos]) { // Source == destination
unique_ptr<Vector> output =
make_uniq<Vector>(LogicalType::LIST(LogicalType::BIGINT));
ListVector::PushBack(*output, state->src[src_pos]);
ListVector::Append(state->pf_results->data[0], ListVector::GetEntry(*output),
ListVector::Append(state->pf_results->data[0],
ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = state->current_batch_path_list_len;
Expand Down Expand Up @@ -220,8 +251,8 @@ void ShortestPathTask::PathConstruction() {

result_data[search_num].length = list_len;
result_data[search_num].offset = state->current_batch_path_list_len;
ListVector::Append(state->pf_results->data[0], ListVector::GetEntry(*output),
list_len);
ListVector::Append(state->pf_results->data[0],
ListVector::GetEntry(*output), list_len);
state->current_batch_path_list_len += list_len;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/include/duckpgq/core/operator/bfs_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class BFSState : public enable_shared_from_this<BFSState> {

// lock for next
mutable vector<mutex> element_locks;
mutex change_lock;
};

} // namespace core
Expand Down

0 comments on commit 115b9ef

Please sign in to comment.