From 07e4bfad5a0b4c89afd42467ce8151b388350c82 Mon Sep 17 00:00:00 2001 From: dtenwolde Date: Tue, 3 Dec 2024 16:52:21 +0100 Subject: [PATCH] Appending datachunk after every batch of pairs --- .../operator/event/shortest_path_event.cpp | 15 ++- .../physical_path_finding_operator.cpp | 121 ++++++++---------- src/core/operator/task/shortest_path_task.cpp | 106 ++++++++------- .../operator/event/shortest_path_event.hpp | 6 +- .../physical_path_finding_operator.hpp | 37 +++--- .../core/operator/task/shortest_path_task.hpp | 4 +- 6 files changed, 139 insertions(+), 150 deletions(-) diff --git a/src/core/operator/event/shortest_path_event.cpp b/src/core/operator/event/shortest_path_event.cpp index 75d6ce1..1eb77f3 100644 --- a/src/core/operator/event/shortest_path_event.cpp +++ b/src/core/operator/event/shortest_path_event.cpp @@ -6,24 +6,29 @@ namespace duckpgq { namespace core { -ShortestPathEvent::ShortestPathEvent(GlobalBFSState &gbfs_state_p, +ShortestPathEvent::ShortestPathEvent(shared_ptr gbfs_state_p, Pipeline &pipeline_p, const PhysicalPathFinding &op_p) - : BasePipelineEvent(pipeline_p), gbfs_state(gbfs_state_p), op(op_p) { + : BasePipelineEvent(pipeline_p), gbfs_state(std::move(gbfs_state_p)), op(op_p) { } void ShortestPathEvent::Schedule() { auto &context = pipeline->GetClientContext(); - + std::cout << gbfs_state->csr->ToString(); vector> bfs_tasks; - size_t threads_to_schedule = std::min(gbfs_state.num_threads, (idx_t)gbfs_state.global_task_queue.size()); - for (idx_t tnum = 0; tnum < threads_to_schedule; tnum++) { + for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) { bfs_tasks.push_back(make_uniq( shared_from_this(), context, gbfs_state, tnum, op)); } SetTasks(std::move(bfs_tasks)); } +void ShortestPathEvent::FinishEvent() { + // if remaining pairs, schedule the BFS for the next batch + if (gbfs_state->total_pairs_processed < gbfs_state->pairs->Count()) { + gbfs_state->ScheduleBFSEvent(*pipeline, *this, gbfs_state->op); + } +} } // namespace core } // namespace duckpgq \ No newline at end of file diff --git a/src/core/operator/physical_path_finding_operator.cpp b/src/core/operator/physical_path_finding_operator.cpp index bb98fd5..994f939 100644 --- a/src/core/operator/physical_path_finding_operator.cpp +++ b/src/core/operator/physical_path_finding_operator.cpp @@ -37,37 +37,32 @@ PhysicalPathFinding::PhysicalPathFinding(LogicalExtensionOperator &op, mode = path_finding_op.mode; } -GlobalBFSState::GlobalBFSState(DataChunk &pairs_, CSR* csr_, int64_t vsize_, +GlobalBFSState::GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, idx_t num_threads_, string mode_, ClientContext &context_) : pairs(pairs_), iter(1), csr(csr_), v_size(vsize_), change(false), started_searches(0), total_len(0), context(context_), seen(vsize_), visit1(vsize_), visit2(vsize_), num_threads(num_threads_), element_locks(vsize_), mode(std::move(mode_)), parents_ve(vsize_) { - if (mode == "iterativelength") { // length - result.Initialize( - context, {LogicalType::BIGINT}, - pairs_.size()); - } else if (mode == "shortestpath") { // path - result.Initialize( - context, {LogicalType::LIST(LogicalType::BIGINT)}, - pairs_.size()); + auto types = pairs->Types(); + if (mode == "iterativelength") { + types.push_back(LogicalType::BIGINT); + } else if (mode == "shortestpath") { + types.push_back(LogicalType::LIST(LogicalType::BIGINT)); } else { throw NotImplementedException("Mode not supported"); } - result.SetCardinality(pairs_.size()); - auto &src_data = pairs.data[0]; - auto &dst_data = pairs.data[1]; - src_data.ToUnifiedFormat(pairs.size(), vdata_src); - dst_data.ToUnifiedFormat(pairs.size(), vdata_dst); - src = FlatVector::GetData(src_data); - dst = FlatVector::GetData(dst_data); - result.SetCapacity(pairs.size()); + results = make_uniq(Allocator::Get(context), types); + current_pairs_batch = make_uniq(); + + // Only have to initialize the the current batch and state once. + pairs->InitializeScan(scan_state); + pairs->InitializeScanChunk(scan_state, *current_pairs_batch); CreateTasks(); - size_t number_of_threads_to_schedule = std::min(num_threads, (idx_t)global_task_queue.size()); - barrier = make_uniq(number_of_threads_to_schedule); + scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size()); + barrier = make_uniq(scheduled_threads); } void GlobalBFSState::Clear() { @@ -107,7 +102,7 @@ void GlobalBFSState::CreateTasks() { if (current_task_start < (idx_t)v_size) { global_task_queue.push_back({current_task_start, v_size}); } - // std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl; + std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl; } @@ -182,16 +177,8 @@ PathFindingGlobalSinkState::PathFindingGlobalSinkState(ClientContext &context, global_csr_column_data = make_uniq(context, op.children[1]->GetTypes()); - auto result_types = op.children[0]->GetTypes(); - if (op.mode == "iterativelength") { - result_types.push_back(LogicalType::BIGINT); - } else { - result_types.push_back(LogicalType::LIST(LogicalType::BIGINT)); - } - results = make_uniq(context, result_types); child = 0; mode = op.mode; - pairs_processed = 0; auto &scheduler = TaskScheduler::GetScheduler(context); num_threads = scheduler.NumberOfThreads(); } @@ -260,41 +247,44 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, ++gstate.child; return SinkFinalizeType::READY; } - - while (gstate.pairs_processed < global_tasks->Count()) { - DataChunk pairs_to_process; - ColumnDataScanState scan_state; - global_tasks->Scan(scan_state, pairs_to_process); - - auto global_bfs_state = make_uniq(pairs_to_process, gstate.csr, gstate.csr->vsize-2, - gstate.num_threads, mode, context); - global_bfs_state->ScheduleBFSEvent(pipeline, event, gstate, this); - ColumnDataAppendState append_state; - // TODO add back in option to set the task size - gstate.pairs_processed += pairs_to_process.size(); - gstate.results->Append(append_state, global_bfs_state->result); - std::cout << "Processed " << gstate.pairs_processed << " pairs" << std::endl; - std::cout << "Total size: " << global_tasks->Count() << std::endl; + if (global_tasks->Count() == 0) { + return SinkFinalizeType::READY; } + auto global_bfs_state = make_shared_ptr(global_tasks, gstate.csr, gstate.csr->vsize-2, + gstate.num_threads, mode, context); + global_bfs_state->ScheduleBFSEvent(pipeline, event, this); + // Move to the next input child ++gstate.child; duckpgq_state->csr_to_delete.insert(gstate.csr_id); return SinkFinalizeType::READY; } -void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, - GlobalSinkState &state, const PhysicalPathFinding *op) { - auto &gstate = state.Cast(); - // remaining pairs - if (started_searches < pairs.size()) { - auto &result_validity = FlatVector::Validity(result.data[0]); +void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) { + current_pairs_batch->Reset(); + pairs->Scan(scan_state, *current_pairs_batch); + op = op_; + + started_searches = 0; // reset + active = 0; + + auto &src_data = current_pairs_batch->data[0]; + auto &dst_data = current_pairs_batch->data[1]; + src_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_src); + dst_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_dst); + src = FlatVector::GetData(src_data); + dst = FlatVector::GetData(dst_data); + + // remaining pairs for current batch + if (started_searches < current_pairs_batch->size()) { + auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]); std::bitset seen_mask; seen_mask.set(); for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { lane_to_num[lane] = -1; - while (started_searches < pairs.size()) { + while (started_searches < current_pairs_batch->size()) { auto search_num = started_searches++; int64_t src_pos = vdata_src.sel->get_index(search_num); if (!vdata_src.validity.RowIsValid(src_pos)) { @@ -317,9 +307,9 @@ void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, throw NotImplementedException("Iterative length has not been implemented yet"); // event.InsertEvent( // make_shared_ptr(gstate, pipeline, *this)); - } else if (gstate.mode == "shortestpath") { + } else if (mode == "shortestpath") { event.InsertEvent( - make_shared_ptr(*this, pipeline, op)); + make_shared_ptr(shared_from_this(), pipeline, *op)); } else { throw NotImplementedException("Mode not supported"); } @@ -360,22 +350,10 @@ class PathFindingGlobalSourceState : public GlobalSourceState { explicit PathFindingGlobalSourceState(const PhysicalPathFinding &op) : op(op), initialized(false) {} - void Initialize(PathFindingGlobalState &sink_state) { - lock_guard initializing(lock); - if (initialized) { - return; - } - initialized = true; - } - public: idx_t MaxThreads() override { return 1; } - - void GetNextPair(ClientContext &client, PathFindingGlobalState &gstate, - PathFindingLocalSourceState &lstate) {} - const PhysicalPathFinding &op; mutex lock; @@ -396,13 +374,14 @@ PhysicalPathFinding::GetLocalSourceState(ExecutionContext &context, SourceResultType PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result, OperatorSourceInput &input) const { auto &pf_sink = sink_state->Cast(); - - // If there are no pairs, we're done - if (pf_sink.results->Count() == 0) { - return SourceResultType::FINISHED; - } - ColumnDataScanState result_scan_state; - pf_sink.results->Scan(result_scan_state, result); + std::cout << "GetData" << std::endl; + // + // // If there are no pairs, we're done + // if (pf_sink.results->Count() == 0) { + // return SourceResultType::FINISHED; + // } + // ColumnDataScanState result_scan_state; + // pf_sink.results->Scan(result_scan_state, result); return SourceResultType::FINISHED; } diff --git a/src/core/operator/task/shortest_path_task.cpp b/src/core/operator/task/shortest_path_task.cpp index 32dd793..2287509 100644 --- a/src/core/operator/task/shortest_path_task.cpp +++ b/src/core/operator/task/shortest_path_task.cpp @@ -6,7 +6,7 @@ namespace core { ShortestPathTask::ShortestPathTask(shared_ptr event_p, ClientContext &context, - GlobalBFSState &state, idx_t worker_id, + shared_ptr &state, idx_t worker_id, const PhysicalOperator &op_p) : ExecutorTask(context, std::move(event_p), op_p), context(context), state(state), worker_id(worker_id) { @@ -14,7 +14,7 @@ ShortestPathTask::ShortestPathTask(shared_ptr event_p, } TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { - auto &barrier = state.barrier; + auto &barrier = state->barrier; do { IterativePath(); @@ -22,7 +22,7 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { // Synchronize after IterativePath barrier->Wait(); if (worker_id == 0) { - state.ResetTaskIndex(); + state->ResetTaskIndex(); } barrier->Wait(); @@ -34,33 +34,33 @@ TaskExecutionResult ShortestPathTask::ExecuteTask(TaskExecutionMode mode) { // ResetTaskIndex." << std::endl; barrier->Wait(); if (worker_id == 0) { - state.ResetTaskIndex(); + state->ResetTaskIndex(); // std::cout << "Worker " << worker_id << ": ResetTaskIndex completed." << // std::endl; } barrier->Wait(); // std::cout << "Worker " << worker_id << ": Passed barrier after // ResetTaskIndex." << std::endl; - } while (state.change); + } while (state->change); barrier->Wait(); if (worker_id == 0) { // std::cout << "Worker " << worker_id << " started path construction" << // std::endl; PathConstruction(); - // std::cout << "Worker " << worker_id << " finished path construction" << - // std::endl; + std::cout << "Worker " << worker_id << " finished path construction" << + std::endl; } // Final synchronization before finishing barrier->Wait(); - // std::cout << "Worker " << worker_id << " finishing task" << std::endl; + std::cout << "Worker " << worker_id << " finishing task" << std::endl; event->FinishTask(); return TaskExecutionResult::TASK_FINISHED; } bool ShortestPathTask::SetTaskRange() { - auto task = state.FetchTask(); + auto task = state->FetchTask(); if (task == nullptr) { return false; } @@ -70,15 +70,15 @@ bool ShortestPathTask::SetTaskRange() { } void ShortestPathTask::IterativePath() { - auto &seen = state.seen; - auto &visit = state.iter & 1 ? state.visit1 : state.visit2; - auto &next = state.iter & 1 ? state.visit2 : state.visit1; - auto &barrier = state.barrier; - int64_t *v = (int64_t *)state.csr->v; - vector &e = state.csr->e; - auto &edge_ids = state.csr->edge_ids; - auto &parents_ve = state.parents_ve; - auto &change = state.change; + auto &seen = state->seen; + auto &visit = state->iter & 1 ? state->visit1 : state->visit2; + auto &next = state->iter & 1 ? state->visit2 : state->visit1; + auto &barrier = state->barrier; + int64_t *v = (int64_t *)state->csr->v; + vector &e = state->csr->e; + auto &edge_ids = state->csr->edge_ids; + auto &parents_ve = state->parents_ve; + auto &change = state->change; // Attempt to get a task range bool has_tasks = SetTaskRange(); @@ -97,7 +97,7 @@ void ShortestPathTask::IterativePath() { auto n = e[offset]; auto edge_id = edge_ids[offset]; { - std::lock_guard lock(state.element_locks[n]); + std::lock_guard lock(state->element_locks[n]); next[n] |= visit[i]; } for (auto l = 0; l < LANE_LIMIT; l++) { @@ -115,7 +115,7 @@ void ShortestPathTask::IterativePath() { // Synchronize at the end of the main processing barrier->Wait([&]() { - state.ResetTaskIndex(); + state->ResetTaskIndex(); }); barrier->Wait(); @@ -135,7 +135,7 @@ void ShortestPathTask::IterativePath() { // Synchronize again barrier->Wait([&]() { - state.ResetTaskIndex(); + state->ResetTaskIndex(); }); barrier->Wait(); } @@ -143,67 +143,68 @@ void ShortestPathTask::IterativePath() { void ShortestPathTask::ReachDetect() { // detect lanes that finished for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - int64_t search_num = state.lane_to_num[lane]; + int64_t search_num = state->lane_to_num[lane]; if (search_num >= 0) { // active lane //! Check if dst for a source has been seen - int64_t dst_pos = state.vdata_dst.sel->get_index(search_num); - if (state.seen[state.dst[dst_pos]][lane]) { - state.active--; + int64_t dst_pos = state->vdata_dst.sel->get_index(search_num); + if (state->seen[state->dst[dst_pos]][lane]) { + state->active--; } } } - if (state.active == 0) { - state.change = false; + if (state->active == 0) { + state->change = false; } // into the next iteration - state.iter++; + state->iter++; } void ShortestPathTask::PathConstruction() { - auto &result = state.result.data[0]; - auto result_data = FlatVector::GetData(result); - auto &result_validity = FlatVector::Validity(result); + auto path_finding_result = make_uniq(); + path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)}); + auto result_data = FlatVector::GetData(path_finding_result->data[0]); + auto &result_validity = FlatVector::Validity(path_finding_result->data[0]); //! Reconstruct the paths for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { - int64_t search_num = state.lane_to_num[lane]; + int64_t search_num = state->lane_to_num[lane]; if (search_num == -1) { // empty lanes continue; } //! Searches that have stopped have found a path - int64_t src_pos = state.vdata_src.sel->get_index(search_num); - int64_t dst_pos = state.vdata_dst.sel->get_index(search_num); - if (state.src[src_pos] == state.dst[dst_pos]) { // Source == destination + int64_t src_pos = state->vdata_src.sel->get_index(search_num); + int64_t dst_pos = state->vdata_dst.sel->get_index(search_num); + if (state->src[src_pos] == state->dst[dst_pos]) { // Source == destination unique_ptr output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); - ListVector::PushBack(*output, state.src[src_pos]); - ListVector::Append(result, ListVector::GetEntry(*output), + ListVector::PushBack(*output, state->src[src_pos]); + ListVector::Append(path_finding_result->data[0], ListVector::GetEntry(*output), ListVector::GetListSize(*output)); result_data[search_num].length = ListVector::GetListSize(*output); - result_data[search_num].offset = state.total_len; - state.total_len += result_data[search_num].length; + result_data[search_num].offset = state->total_len; + state->total_len += result_data[search_num].length; continue; } std::vector output_vector; std::vector output_edge; - auto source_v = state.src[src_pos]; // Take the source + auto source_v = state->src[src_pos]; // Take the source - auto parent_vertex = state.parents_ve[state.dst[dst_pos]][lane].GetV(); - auto parent_edge = state.parents_ve[state.dst[dst_pos]][lane].GetE(); + auto parent_vertex = state->parents_ve[state->dst[dst_pos]][lane].GetV(); + auto parent_edge = state->parents_ve[state->dst[dst_pos]][lane].GetE(); - output_vector.push_back(state.dst[dst_pos]); // Add destination vertex + output_vector.push_back(state->dst[dst_pos]); // Add destination vertex output_vector.push_back(parent_edge); while (parent_vertex != source_v) { // Continue adding vertices until we // have reached the source vertex //! -1 is used to signify no parent if (parent_vertex == -1 || - parent_vertex == state.parents_ve[parent_vertex][lane].GetV()) { + parent_vertex == state->parents_ve[parent_vertex][lane].GetV()) { result_validity.SetInvalid(search_num); break; } output_vector.push_back(parent_vertex); - parent_edge = state.parents_ve[parent_vertex][lane].GetE(); - parent_vertex = state.parents_ve[parent_vertex][lane].GetV(); + parent_edge = state->parents_ve[parent_vertex][lane].GetE(); + parent_vertex = state->parents_ve[parent_vertex][lane].GetV(); output_vector.push_back(parent_edge); } @@ -219,11 +220,18 @@ void ShortestPathTask::PathConstruction() { } result_data[search_num].length = ListVector::GetListSize(*output); - result_data[search_num].offset = state.total_len; - ListVector::Append(result, ListVector::GetEntry(*output), + result_data[search_num].offset = state->total_len; + ListVector::Append(path_finding_result->data[0], ListVector::GetEntry(*output), ListVector::GetListSize(*output)); - state.total_len += result_data[search_num].length; + state->total_len += result_data[search_num].length; } + path_finding_result->SetCardinality(state->current_pairs_batch->size()); + + state->current_pairs_batch->Fuse(*path_finding_result); + state->current_pairs_batch->Print(); + state->results->Append(*state->current_pairs_batch); + + state->results->Print(); } } // namespace core diff --git a/src/include/duckpgq/core/operator/event/shortest_path_event.hpp b/src/include/duckpgq/core/operator/event/shortest_path_event.hpp index 019d75e..37ab332 100644 --- a/src/include/duckpgq/core/operator/event/shortest_path_event.hpp +++ b/src/include/duckpgq/core/operator/event/shortest_path_event.hpp @@ -15,16 +15,16 @@ namespace duckpgq { namespace core { -class PathFindingGlobalState; class ShortestPathEvent : public BasePipelineEvent { public: - explicit ShortestPathEvent(GlobalBFSState &gbfs_state_p, Pipeline &pipeline_p, const PhysicalPathFinding& op_p); + explicit ShortestPathEvent(shared_ptr gbfs_state_p, Pipeline &pipeline_p, const PhysicalPathFinding& op_p); void Schedule() override; + void FinishEvent() override; private: - GlobalBFSState &gbfs_state; + shared_ptr gbfs_state; const PhysicalPathFinding &op; }; diff --git a/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp b/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp index c03c3ca..1388c86 100644 --- a/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp +++ b/src/include/duckpgq/core/operator/physical_path_finding_operator.hpp @@ -74,9 +74,6 @@ class PhysicalPathFinding : public PhysicalComparisonJoin { bool ParallelSink() const override { return true; } void BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) override; - - // Schedules tasks to calculate the next iteration of the path-finding - void ScheduleBFSEvent(Pipeline &pipeline, Event &event, GlobalSinkState &state) const; }; //===--------------------------------------------------------------------===// @@ -92,14 +89,13 @@ class PathFindingLocalSinkState : public LocalSinkState { }; -class GlobalBFSState { +class GlobalBFSState : public enable_shared_from_this { public: - GlobalBFSState(DataChunk &pairs_, CSR* csr_, int64_t vsize_, + GlobalBFSState(unique_ptr &pairs_, CSR* csr_, int64_t vsize_, idx_t num_threads_, string mode_, ClientContext &context_); - void ScheduleBFSEvent(Pipeline &pipeline, Event &event, - GlobalSinkState &state, const PhysicalPathFinding *op); + void ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op); void Clear(); @@ -109,11 +105,13 @@ class GlobalBFSState { pair BoundaryCalculation(idx_t worker_id) const; CSR *csr; - DataChunk &pairs; + unique_ptr &pairs; // (src, dst) pairs + unique_ptr current_pairs_batch; + const PhysicalPathFinding *op; int64_t iter; - int64_t v_size; + int64_t v_size; // Number of vertices bool change; - idx_t started_searches; + idx_t started_searches; // Number of started searches in current batch int64_t total_len; int64_t *src; int64_t *dst; @@ -121,20 +119,25 @@ class GlobalBFSState { UnifiedVectorFormat vdata_dst; int64_t lane_to_num[LANE_LIMIT]; idx_t active = 0; - DataChunk result; + unique_ptr results; // results of (src, dst, path-finding) + ColumnDataScanState scan_state; + ColumnDataAppendState append_state; ClientContext &context; vector> seen; vector> visit1; vector> visit2; vector> parents_ve; + idx_t total_pairs_processed; idx_t num_threads; + idx_t scheduled_threads; + // task_queues[workerId] = {curTaskIdx, queuedTasks} // queuedTasks[curTaskIx] = {start, end} vector> global_task_queue; - std::mutex queue_mutex; // Mutex for synchronizing access - std::condition_variable queue_cv; // Condition variable for task availability - size_t current_task_index = 0; // Index to track the current task + std::mutex queue_mutex; // Mutex for synchronizing access + std::condition_variable queue_cv; // Condition variable for task availability + size_t current_task_index = 0; // Index to track the current task int64_t split_size = 256; unique_ptr barrier; @@ -155,17 +158,11 @@ class PathFindingGlobalSinkState : public GlobalSinkState { // pairs is a 2-column table with src and dst unique_ptr global_pairs; unique_ptr global_csr_column_data; - - unique_ptr results; // results of the path-finding - - ColumnDataScanState scan_state; - ColumnDataAppendState append_state; CSR* csr; int32_t csr_id; size_t child; string mode; ClientContext &context_; - idx_t pairs_processed; idx_t num_threads; }; diff --git a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp index af0db4e..1ea576c 100644 --- a/src/include/duckpgq/core/operator/task/shortest_path_task.hpp +++ b/src/include/duckpgq/core/operator/task/shortest_path_task.hpp @@ -9,7 +9,7 @@ namespace core { class ShortestPathTask : public ExecutorTask { public: ShortestPathTask(shared_ptr event_p, ClientContext &context, - GlobalBFSState &state, idx_t worker_id, + shared_ptr &state, idx_t worker_id, const PhysicalOperator &op_p); TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; @@ -24,7 +24,7 @@ class ShortestPathTask : public ExecutorTask { bool SetTaskRange(); ClientContext &context; - GlobalBFSState &state; + shared_ptr &state; // [left, right) idx_t left; idx_t right;