diff --git a/src/core/operator/CMakeLists.txt b/src/core/operator/CMakeLists.txt index c46a28a..b138a62 100644 --- a/src/core/operator/CMakeLists.txt +++ b/src/core/operator/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(event) add_subdirectory(task) set(EXTENSION_SOURCES + ${CMAKE_CURRENT_SOURCE_DIR}/bfs_state.cpp ${CMAKE_CURRENT_SOURCE_DIR}/duckpgq_bind.cpp ${CMAKE_CURRENT_SOURCE_DIR}/logical_path_finding_operator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/physical_path_finding_operator.cpp diff --git a/src/core/operator/bfs_state.cpp b/src/core/operator/bfs_state.cpp new file mode 100644 index 0000000..0fe3ff5 --- /dev/null +++ b/src/core/operator/bfs_state.cpp @@ -0,0 +1,132 @@ +#include "duckpgq/core/operator/bfs_state.hpp" + +#include +#include +#include + +namespace duckpgq { + +namespace core { + +GlobalBFSState::GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, + idx_t num_threads_, string mode_, ClientContext &context_) + : pairs(pairs_), iter(1), csr(csr_), v_size(vsize_), change(false), + started_searches(0), context(context_), seen(vsize_), + visit1(vsize_), visit2(vsize_), num_threads(num_threads_), + element_locks(vsize_), + mode(std::move(mode_)), parents_ve(vsize_) { + auto types = pairs->Types(); + if (mode == "iterativelength") { + types.push_back(LogicalType::BIGINT); + } else if (mode == "shortestpath") { + types.push_back(LogicalType::LIST(LogicalType::BIGINT)); + } else { + throw NotImplementedException("Mode not supported"); + } + + results = make_uniq(Allocator::Get(context), types); + results->InitializeScan(result_scan_state); + + // Only have to initialize the current batch and state once. + pairs->InitializeScan(input_scan_state); + total_pairs_processed = 0; // Initialize the total pairs processed + + + CreateTasks(); + scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size()); + barrier = make_uniq(scheduled_threads); +} + +void GlobalBFSState::Clear() { + iter = 1; + active = 0; + change = false; + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + visit1[i] = 0; + if (mode == "shortestpath") { + for (auto j = 0; j < LANE_LIMIT; j++) { + parents_ve[i][j] = {-1, -1}; + } + } + } +} + +void GlobalBFSState::CreateTasks() { + // workerTasks[workerId] = [task1, task2, ...] + // vector>> worker_tasks(num_threads); + // auto cur_worker = 0; + int64_t *v = (int64_t *)csr->v; + int64_t current_task_edges = 0; + idx_t current_task_start = 0; + for (idx_t v_idx = 0; v_idx < (idx_t)v_size; v_idx++) { + auto number_of_edges = v[v_idx + 1] - v[v_idx]; + if (current_task_edges + number_of_edges > split_size) { + global_task_queue.push_back({current_task_start, v_idx}); + current_task_start = v_idx; + current_task_edges = 0; // reset + } + + current_task_edges += number_of_edges; + } + + // Final task if there are any remaining edges + if (current_task_start < (idx_t)v_size) { + global_task_queue.push_back({current_task_start, v_size}); + } + // std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl; +} + + +shared_ptr> GlobalBFSState::FetchTask() { + std::unique_lock lock(queue_mutex); // Lock the mutex to access the queue + + // Log entry into FetchTask + // std::cout << "FetchTask: Checking tasks. Current index: " << current_task_index + // << ", Total tasks: " << global_task_queue.size() << std::endl; + + // Avoid unnecessary waiting if no tasks are available + if (current_task_index >= global_task_queue.size()) { + // std::cout << "FetchTask: No more tasks available. Exiting." << std::endl; + return nullptr; // No more tasks + } + + // Wait until a task is available or the queue is finalized + queue_cv.wait(lock, [this]() { + return current_task_index < global_task_queue.size(); + }); + + // Fetch the next task and increment the task index + if (current_task_index < global_task_queue.size()) { + auto task = make_shared_ptr>(global_task_queue[current_task_index]); + current_task_index++; + + // Log the fetched task + // std::cout << "FetchTask: Fetched task " << current_task_index - 1 + // << " -> [" << task->first << ", " << task->second << "]" << std::endl; + + return task; + } + + // Log no tasks available after wait + // std::cout << "FetchTask: No more tasks available after wait. Exiting." << std::endl; + return nullptr; +} + +void GlobalBFSState::ResetTaskIndex() { + std::lock_guard lock(queue_mutex); // Lock to reset index safely + current_task_index = 0; // Reset the task index for the next stage + queue_cv.notify_all(); // Notify all threads that tasks are available +} + +pair GlobalBFSState::BoundaryCalculation(idx_t worker_id) const { + idx_t block_size = ceil((double)v_size / num_threads); + block_size = block_size == 0 ? 1 : block_size; + idx_t left = block_size * worker_id; + idx_t right = std::min(block_size * (worker_id + 1), (idx_t)v_size); + return {left, right}; +} + +} // namespace core + +} // namespace duckpgq \ No newline at end of file diff --git a/src/core/operator/event/shortest_path_event.cpp b/src/core/operator/event/shortest_path_event.cpp index 6ab084f..e54d7ad 100644 --- a/src/core/operator/event/shortest_path_event.cpp +++ b/src/core/operator/event/shortest_path_event.cpp @@ -16,20 +16,30 @@ void ShortestPathEvent::Schedule() { auto &context = pipeline->GetClientContext(); // std::cout << gbfs_state->csr->ToString(); vector> bfs_tasks; + std::cout << "Scheduling threads " << gbfs_state->scheduled_threads << std::endl; for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) { + std::cout << "Scheduling task" << std::endl; bfs_tasks.push_back(make_uniq( shared_from_this(), context, gbfs_state, tnum, op)); } SetTasks(std::move(bfs_tasks)); } -void ShortestPathEvent::FinishEvent() { - // if remaining pairs, schedule the BFS for the next batch - if (gbfs_state->total_pairs_processed < gbfs_state->pairs->Count()) { - gbfs_state->Clear(); - gbfs_state->ScheduleBFSEvent(*pipeline, *this, gbfs_state->op); - } -} +// void ShortestPathEvent::FinishEvent() { +// std::cout << "Total pairs processed: " << gbfs_state->started_searches << std::endl; +// std::cout << "Number of pairs: " << gbfs_state->current_pairs_batch->size() << std::endl; +// gbfs_state->path_finding_result->Print(); +// // if remaining pairs, schedule the BFS for the next batch +// if (gbfs_state->started_searches == gbfs_state->current_pairs_batch->size()) { +// gbfs_state->path_finding_result->SetCardinality(gbfs_state->current_pairs_batch->size()); +// gbfs_state->total_pairs_processed += gbfs_state->current_pairs_batch->size(); +// gbfs_state->current_pairs_batch->Fuse(*gbfs_state->path_finding_result); +// gbfs_state->results->Append(*gbfs_state->current_pairs_batch); +// +// } +// std::cout << gbfs_state->started_searches << " " << gbfs_state->pairs->Count() << std::endl; +// std::cout << "Finished event" << std::endl; +// } } // namespace core } // namespace duckpgq \ No newline at end of file diff --git a/src/core/operator/physical_path_finding_operator.cpp b/src/core/operator/physical_path_finding_operator.cpp index 3e3f7a6..2a8fb4f 100644 --- a/src/core/operator/physical_path_finding_operator.cpp +++ b/src/core/operator/physical_path_finding_operator.cpp @@ -10,8 +10,7 @@ #include "duckdb/parallel/event.hpp" #include "duckdb/parallel/meta_pipeline.hpp" #include "duckdb/parallel/thread_context.hpp" -#include "duckpgq/core/utils/duckpgq_barrier.hpp" -#include +#include "duckpgq/core/operator/bfs_state.hpp" #include #include #include @@ -37,123 +36,6 @@ PhysicalPathFinding::PhysicalPathFinding(LogicalExtensionOperator &op, mode = path_finding_op.mode; } -GlobalBFSState::GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, - idx_t num_threads_, string mode_, ClientContext &context_) - : pairs(pairs_), iter(1), csr(csr_), v_size(vsize_), change(false), - started_searches(0), context(context_), seen(vsize_), - visit1(vsize_), visit2(vsize_), num_threads(num_threads_), - element_locks(vsize_), - mode(std::move(mode_)), parents_ve(vsize_) { - auto types = pairs->Types(); - if (mode == "iterativelength") { - types.push_back(LogicalType::BIGINT); - } else if (mode == "shortestpath") { - types.push_back(LogicalType::LIST(LogicalType::BIGINT)); - } else { - throw NotImplementedException("Mode not supported"); - } - - results = make_uniq(Allocator::Get(context), types); - results->InitializeScan(result_scan_state); - - // Only have to initialize the the current batch and state once. - pairs->InitializeScan(input_scan_state); - - - CreateTasks(); - scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size()); - barrier = make_uniq(scheduled_threads); -} - -void GlobalBFSState::Clear() { - iter = 1; - active = 0; - change = false; - // empty visit vectors - for (auto i = 0; i < v_size; i++) { - visit1[i] = 0; - if (mode == "shortestpath") { - for (auto j = 0; j < LANE_LIMIT; j++) { - parents_ve[i][j] = {-1, -1}; - } - } - } -} - -void GlobalBFSState::CreateTasks() { - // workerTasks[workerId] = [task1, task2, ...] - // vector>> worker_tasks(num_threads); - // auto cur_worker = 0; - int64_t *v = (int64_t *)csr->v; - int64_t current_task_edges = 0; - idx_t current_task_start = 0; - for (idx_t v_idx = 0; v_idx < (idx_t)v_size; v_idx++) { - auto number_of_edges = v[v_idx + 1] - v[v_idx]; - if (current_task_edges + number_of_edges > split_size) { - global_task_queue.push_back({current_task_start, v_idx}); - current_task_start = v_idx; - current_task_edges = 0; // reset - } - - current_task_edges += number_of_edges; - } - - // Final task if there are any remaining edges - if (current_task_start < (idx_t)v_size) { - global_task_queue.push_back({current_task_start, v_size}); - } - // std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl; -} - - -shared_ptr> GlobalBFSState::FetchTask() { - std::unique_lock lock(queue_mutex); // Lock the mutex to access the queue - - // Log entry into FetchTask - // std::cout << "FetchTask: Checking tasks. Current index: " << current_task_index - // << ", Total tasks: " << global_task_queue.size() << std::endl; - - // Avoid unnecessary waiting if no tasks are available - if (current_task_index >= global_task_queue.size()) { - // std::cout << "FetchTask: No more tasks available. Exiting." << std::endl; - return nullptr; // No more tasks - } - - // Wait until a task is available or the queue is finalized - queue_cv.wait(lock, [this]() { - return current_task_index < global_task_queue.size(); - }); - - // Fetch the next task and increment the task index - if (current_task_index < global_task_queue.size()) { - auto task = make_shared_ptr>(global_task_queue[current_task_index]); - current_task_index++; - - // Log the fetched task - // std::cout << "FetchTask: Fetched task " << current_task_index - 1 - // << " -> [" << task->first << ", " << task->second << "]" << std::endl; - - return task; - } - - // Log no tasks available after wait - // std::cout << "FetchTask: No more tasks available after wait. Exiting." << std::endl; - return nullptr; -} - -void GlobalBFSState::ResetTaskIndex() { - std::lock_guard lock(queue_mutex); // Lock to reset index safely - current_task_index = 0; // Reset the task index for the next stage - queue_cv.notify_all(); // Notify all threads that tasks are available -} - -pair GlobalBFSState::BoundaryCalculation(idx_t worker_id) const { - idx_t block_size = ceil((double)v_size / num_threads); - block_size = block_size == 0 ? 1 : block_size; - idx_t left = block_size * worker_id; - idx_t right = std::min(block_size * (worker_id + 1), (idx_t)v_size); - return {left, right}; -} //===--------------------------------------------------------------------===// // Sink @@ -253,7 +135,7 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, gstate.global_bfs_state = make_shared_ptr(global_tasks, gstate.csr, gstate.csr->vsize-2, gstate.num_threads, mode, context); - gstate.global_bfs_state->ScheduleBFSEvent(pipeline, event, this); + gstate.global_bfs_state->InitializeBFS(pipeline, event, this); // Move to the next input child ++gstate.child; @@ -261,12 +143,58 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, return SinkFinalizeType::READY; } -void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) { +void GlobalBFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event) { + std::cout << "Starting BFS for " << current_pairs_batch->size() << " pairs with " << LANE_LIMIT << " batch size." << std::endl; + std::cout << "Number of started searches: " << started_searches << std::endl; + auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]); + std::bitset seen_mask; + seen_mask.set(); + + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < current_pairs_batch->size()) { + auto search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + } else { + visit1[src[src_pos]][lane] = true; + // bfs_state->seen[bfs_state->src[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + seen_mask[lane] = false; + break; + } + } + } + for (int64_t i = 0; i < v_size; i++) { + seen[i] = seen_mask; + } + + if (mode == "iterativelength") { + throw NotImplementedException("Iterative length has not been implemented yet"); + // event.InsertEvent( + // make_shared_ptr(gstate, pipeline, *this)); + } else if (mode == "shortestpath") { + std::cout << "Scheduling shortest path event" << std::endl; + event.InsertEvent( + make_shared_ptr(shared_from_this(), pipeline, *op)); + } else { + throw NotImplementedException("Mode not supported"); + } +} + + +void GlobalBFSState::InitializeBFS(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) { current_pairs_batch = make_uniq(); pairs->InitializeScanChunk(input_scan_state, *current_pairs_batch); pairs->Scan(input_scan_state, *current_pairs_batch); op = op_; + path_finding_result = make_uniq(); + path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)}); + current_batch_path_list_len = 0; + started_searches = 0; // reset active = 0; @@ -278,43 +206,23 @@ void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, const Ph dst = FlatVector::GetData(dst_data); // remaining pairs for current batch - if (started_searches < current_pairs_batch->size()) { - auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]); - std::bitset seen_mask; - seen_mask.set(); - - for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - lane_to_num[lane] = -1; - while (started_searches < current_pairs_batch->size()) { - auto search_num = started_searches++; - int64_t src_pos = vdata_src.sel->get_index(search_num); - if (!vdata_src.validity.RowIsValid(src_pos)) { - result_validity.SetInvalid(search_num); - } else { - visit1[src[src_pos]][lane] = true; - // bfs_state->seen[bfs_state->src[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - seen_mask[lane] = false; - break; - } - } - } - for (int64_t i = 0; i < v_size; i++) { - seen[i] = seen_mask; - } - - if (mode == "iterativelength") { - throw NotImplementedException("Iterative length has not been implemented yet"); - // event.InsertEvent( - // make_shared_ptr(gstate, pipeline, *this)); - } else if (mode == "shortestpath") { - event.InsertEvent( - make_shared_ptr(shared_from_this(), pipeline, *op)); - } else { - throw NotImplementedException("Mode not supported"); - } + while (started_searches < current_pairs_batch->size()) { + ScheduleBFSBatch(pipeline, event); + Clear(); + } + if (started_searches != current_pairs_batch->size()) { + throw InternalException("Number of started searches does not match the number of pairs"); } + // path_finding_result->SetCardinality(current_pairs_batch->size()); + // path_finding_result->Print(); + // current_pairs_batch->Fuse(*path_finding_result); + // current_pairs_batch->Print(); + // results->Append(*current_pairs_batch); + // total_pairs_processed += current_pairs_batch->size(); + // std::cout << "Total pairs processed: " << total_pairs_processed << std::endl; + // if (total_pairs_processed < pairs->Count()) { + // InitializeBFS(pipeline, event, op); + // } } InsertionOrderPreservingMap PhysicalPathFinding::ParamsToString() const { @@ -375,7 +283,8 @@ PhysicalPathFinding::GetLocalSourceState(ExecutionContext &context, SourceResultType PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result, OperatorSourceInput &input) const { auto &pf_sink = sink_state->Cast(); - + pf_sink.global_bfs_state->path_finding_result->SetCardinality(pf_sink.global_bfs_state->pairs->Count()); + pf_sink.global_bfs_state->path_finding_result->Print(); // If there are no pairs, we're done if (pf_sink.global_bfs_state->results->Count() == 0) { return SourceResultType::FINISHED; diff --git a/src/core/operator/task/shortest_path_task.cpp b/src/core/operator/task/shortest_path_task.cpp index 2a36c6b..f81e19b 100644 --- a/src/core/operator/task/shortest_path_task.cpp +++ b/src/core/operator/task/shortest_path_task.cpp @@ -15,7 +15,7 @@ ShortestPathTask::ShortestPathTask(shared_ptr event_p, TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { auto &barrier = state->barrier; - + std::cout << "Starting to execute task " << worker_id << std::endl; do { IterativePath(); @@ -25,36 +25,24 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { state->ResetTaskIndex(); } barrier->Wait(); - if (worker_id == 0) { ReachDetect(); } - - // std::cout << "Worker " << worker_id << ": Waiting at barrier before - // ResetTaskIndex." << std::endl; barrier->Wait(); if (worker_id == 0) { state->ResetTaskIndex(); - // std::cout << "Worker " << worker_id << ": ResetTaskIndex completed." << - // std::endl; } barrier->Wait(); - // std::cout << "Worker " << worker_id << ": Passed barrier after - // ResetTaskIndex." << std::endl; } while (state->change); barrier->Wait(); if (worker_id == 0) { - // std::cout << "Worker " << worker_id << " started path construction" << - // std::endl; PathConstruction(); - // std::cout << "Worker " << worker_id << " finished path construction" << - // std::endl; } // Final synchronization before finishing barrier->Wait(); - // std::cout << "Worker " << worker_id << " finishing task" << std::endl; + std::cout << "Worker " << worker_id << " finishing task" << std::endl; event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } @@ -160,11 +148,9 @@ void ShortestPathTask::ReachDetect() { } void ShortestPathTask::PathConstruction() { - auto path_finding_result = make_uniq(); - path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)}); - auto result_data = FlatVector::GetData(path_finding_result->data[0]); - auto &result_validity = FlatVector::Validity(path_finding_result->data[0]); - size_t list_len = 0; + + auto result_data = FlatVector::GetData(state->path_finding_result->data[0]); + auto &result_validity = FlatVector::Validity(state->path_finding_result->data[0]); //! Reconstruct the paths for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = state->lane_to_num[lane]; @@ -179,11 +165,11 @@ void ShortestPathTask::PathConstruction() { unique_ptr output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); ListVector::PushBack(*output, state->src[src_pos]); - ListVector::Append(path_finding_result->data[0], ListVector::GetEntry(*output), + ListVector::Append(state->path_finding_result->data[0], ListVector::GetEntry(*output), ListVector::GetListSize(*output)); result_data[search_num].length = ListVector::GetListSize(*output); - result_data[search_num].offset = list_len; - list_len += result_data[search_num].length; + result_data[search_num].offset = state->current_batch_path_list_len; + state->current_batch_path_list_len += result_data[search_num].length; continue; } std::vector output_vector; @@ -221,17 +207,11 @@ void ShortestPathTask::PathConstruction() { } result_data[search_num].length = ListVector::GetListSize(*output); - result_data[search_num].offset = list_len; - ListVector::Append(path_finding_result->data[0], ListVector::GetEntry(*output), + result_data[search_num].offset = state->current_batch_path_list_len; + ListVector::Append(state->path_finding_result->data[0], ListVector::GetEntry(*output), ListVector::GetListSize(*output)); - list_len += result_data[search_num].length; + state->current_batch_path_list_len += result_data[search_num].length; } - path_finding_result->SetCardinality(state->current_pairs_batch->size()); - - state->current_pairs_batch->Fuse(*path_finding_result); - // state->current_pairs_batch->Print(); - state->results->Append(*state->current_pairs_batch); - // state->results->Print(); } } // namespace core diff --git a/src/include/duckpgq/core/operator/bfs_state.hpp b/src/include/duckpgq/core/operator/bfs_state.hpp new file mode 100644 index 0000000..48c0176 --- /dev/null +++ b/src/include/duckpgq/core/operator/bfs_state.hpp @@ -0,0 +1,75 @@ +#pragma once + +#include "duckpgq/common.hpp" +#include "duckpgq/core/operator/physical_path_finding_operator.hpp" + +namespace duckpgq { + +namespace core { + + +class GlobalBFSState : public enable_shared_from_this { + +public: + GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, + idx_t num_threads_, string mode_, ClientContext &context_); + + void InitializeBFS(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op); + void ScheduleBFSBatch(Pipeline &pipeline, Event &event); + + void Clear(); + + void CreateTasks(); + shared_ptr> FetchTask(); // Function to fetch a task + void ResetTaskIndex(); + + pair BoundaryCalculation(idx_t worker_id) const; + CSR *csr; + unique_ptr &pairs; // (src, dst) pairs + unique_ptr current_pairs_batch; + const PhysicalPathFinding *op; + int64_t iter; + int64_t v_size; // Number of vertices + bool change; + idx_t started_searches; // Number of started searches in current batch + int64_t *src; + int64_t *dst; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + int64_t lane_to_num[LANE_LIMIT]; + idx_t active = 0; + unique_ptr path_finding_result; + size_t current_batch_path_list_len; // Length of the current batch path list + unique_ptr results; // results of (src, dst, path-finding) + ColumnDataScanState result_scan_state; + ColumnDataScanState input_scan_state; + ColumnDataAppendState append_state; + ClientContext &context; + vector> seen; + vector> visit1; + vector> visit2; + vector> parents_ve; + + idx_t total_pairs_processed; + idx_t num_threads; + idx_t scheduled_threads; + + // task_queues[workerId] = {curTaskIdx, queuedTasks} + // queuedTasks[curTaskIx] = {start, end} + vector> global_task_queue; + std::mutex queue_mutex; // Mutex for synchronizing access + std::condition_variable queue_cv; // Condition variable for task availability + size_t current_task_index = 0; // Index to track the current task + int64_t split_size = 256; + + unique_ptr barrier; + + // lock for next + mutable vector element_locks; + + string mode; +}; + +} // namespace core + +} // namespace duckpgq \ No newline at end of file diff --git a/src/include/duckpgq/core/operator/event/shortest_path_event.hpp b/src/include/duckpgq/core/operator/event/shortest_path_event.hpp index 37ab332..3be5155 100644 --- a/src/include/duckpgq/core/operator/event/shortest_path_event.hpp +++ b/src/include/duckpgq/core/operator/event/shortest_path_event.hpp @@ -21,7 +21,7 @@ class ShortestPathEvent : public BasePipelineEvent { explicit ShortestPathEvent(shared_ptr gbfs_state_p, Pipeline &pipeline_p, const PhysicalPathFinding& op_p); void Schedule() override; - void FinishEvent() override; + // void FinishEvent() override; private: shared_ptr gbfs_state; diff --git a/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp b/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp index 03e7c78..117fa43 100644 --- a/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp +++ b/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp @@ -90,64 +90,7 @@ class PathFindingLocalSinkState : public LocalSinkState { }; -class GlobalBFSState : public enable_shared_from_this { -public: - GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, - idx_t num_threads_, string mode_, ClientContext &context_); - - void ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op); - - void Clear(); - - void CreateTasks(); - shared_ptr> FetchTask(); // Function to fetch a task - void ResetTaskIndex(); - - pair BoundaryCalculation(idx_t worker_id) const; - CSR *csr; - unique_ptr &pairs; // (src, dst) pairs - unique_ptr current_pairs_batch; - const PhysicalPathFinding *op; - int64_t iter; - int64_t v_size; // Number of vertices - bool change; - idx_t started_searches; // Number of started searches in current batch - int64_t *src; - int64_t *dst; - UnifiedVectorFormat vdata_src; - UnifiedVectorFormat vdata_dst; - int64_t lane_to_num[LANE_LIMIT]; - idx_t active = 0; - unique_ptr results; // results of (src, dst, path-finding) - ColumnDataScanState result_scan_state; - ColumnDataScanState input_scan_state; - ColumnDataAppendState append_state; - ClientContext &context; - vector> seen; - vector> visit1; - vector> visit2; - vector> parents_ve; - - idx_t total_pairs_processed; - idx_t num_threads; - idx_t scheduled_threads; - - // task_queues[workerId] = {curTaskIdx, queuedTasks} - // queuedTasks[curTaskIx] = {start, end} - vector> global_task_queue; - std::mutex queue_mutex; // Mutex for synchronizing access - std::condition_variable queue_cv; // Condition variable for task availability - size_t current_task_index = 0; // Index to track the current task - int64_t split_size = 256; - - unique_ptr barrier; - - // lock for next - mutable vector element_locks; - - string mode; -}; class PathFindingGlobalSinkState : public GlobalSinkState { public: diff --git a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp index 1ea576c..608375b 100644 --- a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp +++ b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp @@ -1,7 +1,7 @@ #pragma once #include "duckpgq/common.hpp" -#include +#include namespace duckpgq { namespace core {