diff --git a/duckdb-pgq b/duckdb-pgq index e55aa6d8..8bd29f5a 160000 --- a/duckdb-pgq +++ b/duckdb-pgq @@ -1 +1 @@ -Subproject commit e55aa6d85d7f3162f17215ad77cc8e5c32f9c87e +Subproject commit 8bd29f5ac550cc57ded6073d6a9629133f137206 diff --git a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp index a4067c98..734469c2 100644 --- a/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp +++ b/duckpgq/include/duckpgq/operators/physical_path_finding_operator.hpp @@ -91,6 +91,9 @@ class PhysicalPathFinding : public PhysicalComparisonJoin { //! Local copy of the expression executor ExpressionExecutor executor; + //! Final result for the path-finding pairs + DataChunk local_results; + }; public: diff --git a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp index b5b3c955..591407a0 100644 --- a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp +++ b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp @@ -139,7 +139,7 @@ void PhysicalPathFinding::LocalCompressedSparseRow::Sink( Vector result = Vector(src.GetType()); ValidityMask &result_validity = FlatVector::Validity(result); result.SetVectorType(VectorType::FLAT_VECTOR); - auto result_data = FlatVector::GetData(result); + auto result_data = FlatVector::GetData(result); vector> seen(v_size); vector> visit1(v_size); @@ -210,7 +210,9 @@ void PhysicalPathFinding::LocalCompressedSparseRow::Sink( } } } - result.Print(); + local_results.data.emplace_back(result); + local_results.SetCardinality(input); + local_results.Print(); return; } CreateCSR(input, global_csr); @@ -253,6 +255,8 @@ class PathFindingGlobalState : public GlobalSinkState { unique_ptr global_csr; size_t child; + DataChunk result; + }; unique_ptr @@ -288,6 +292,7 @@ PhysicalPathFinding::Combine(ExecutionContext &context, auto &lstate = input.local_state.Cast(); auto &client_profiler = QueryProfiler::Get(context.client); + gstate.result.Move(lstate.local_csr.local_results); client_profiler.Flush(context.thread.profiler); return SinkCombineResultType::FINISHED; @@ -326,8 +331,6 @@ class PathFindingLocalSourceState : public LocalSourceState { } const PhysicalPathFinding &op; - - DataChunk pf_results; }; class PathFindingGlobalSourceState : public GlobalSourceState { @@ -345,10 +348,8 @@ class PathFindingGlobalSourceState : public GlobalSourceState { public: idx_t MaxThreads() override { - // We can't leverage any more threads than block pairs. const auto &sink_state = (op.sink_state->Cast()); return 1; - } void GetNextPair(ClientContext &client, PathFindingGlobalState &gstate, @@ -378,11 +379,10 @@ PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result, auto &pf_sink = sink_state->Cast(); auto &pf_gstate = input.global_state.Cast(); auto &pf_lstate = input.local_state.Cast(); - + result.Move(pf_sink.result); result.Print(); pf_gstate.Initialize(pf_sink); - return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; }