diff --git a/duckpgq/src/duckpgq/operators/logical_path_finding_operator.cpp b/duckpgq/src/duckpgq/operators/logical_path_finding_operator.cpp index 0ef3dabd..3202405f 100644 --- a/duckpgq/src/duckpgq/operators/logical_path_finding_operator.cpp +++ b/duckpgq/src/duckpgq/operators/logical_path_finding_operator.cpp @@ -23,6 +23,7 @@ void LogicalPathFindingOperator::ResolveTypes() { types = children[0]->types; auto right_types = children[1]->types; types.insert(types.end(), right_types.begin(), right_types.end()); + // types = {LogicalType::BIGINT, LogicalType::BIGINT}; } string LogicalPathFindingOperator::ParamsToString() const { diff --git a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp index 51e4bf38..21a8200b 100644 --- a/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp +++ b/duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp @@ -70,9 +70,9 @@ class PathFindingLocalState : public LocalSinkState { if (global_csr.is_ready) { // Add the tasks (src, dst) to sink // Optimizations: Eliminate duplicate sources/destinations - input.Print(); + // input.Print(); local_tasks.Append(input); - local_tasks.Print(); + // local_tasks.Print(); return; } CreateCSR(input, global_csr); @@ -112,7 +112,7 @@ void PathFindingLocalState::CreateCSR(DataChunk &input, global_csr.edge_ids[static_cast(pos) - 1] = edge_id; return 1; }); - global_csr.Print(); // Debug print + // global_csr.Print(); // Debug print } class PathFindingGlobalState : public GlobalSinkState { @@ -129,18 +129,22 @@ class PathFindingGlobalState : public GlobalSinkState { PathFindingGlobalState(PathFindingGlobalState &prev) : GlobalSinkState(prev), global_tasks(prev.global_tasks), + scan_state(prev.scan_state), append_state(prev.append_state), global_csr(std::move(prev.global_csr)), child(prev.child + 1) {} void Sink(DataChunk &input, PathFindingLocalState &lstate) const { lstate.Sink(input, *global_csr); } - unique_ptr global_csr; - size_t child; - + // pairs is a 2-column table with src and dst ColumnDataCollection global_tasks; + // pairs with path exists + // ColumnDataCollection global_results; ColumnDataScanState scan_state; ColumnDataAppendState append_state; + + unique_ptr global_csr; + size_t child; }; unique_ptr @@ -178,18 +182,163 @@ PhysicalPathFinding::Combine(ExecutionContext &context, gstate.global_tasks.Combine(lstate.local_tasks); client_profiler.Flush(context.thread.profiler); - gstate.global_tasks.Print(); + // gstate.global_tasks.Print(); return SinkCombineResultType::FINISHED; } //===--------------------------------------------------------------------===// // Finalize //===--------------------------------------------------------------------===// + +static bool IterativeLength(int64_t v_size, int64_t *v, vector &e, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + for (auto i = 0; i < v_size; i++) { + next[i] = 0; + } + for (auto i = 0; i < v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + next[n] = next[n] | visit[i]; + } + } + } + for (auto i = 0; i < v_size; i++) { + next[i] = next[i] & ~seen[i]; + seen[i] = seen[i] | next[i]; + change |= next[i].any(); + } + return change; +} + +static void IterativeLengthFunction(const unique_ptr &csr, + DataChunk &pairs, Vector &result) { + int64_t v_size = csr->v_size; + int64_t *v = (int64_t *)csr->v; + vector &e = csr->e; + + // get src and dst vectors for searches + auto &src = pairs.data[0]; + auto &dst = pairs.data[1]; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + src.ToUnifiedFormat(pairs.size(), vdata_src); + dst.ToUnifiedFormat(pairs.size(), vdata_dst); + + auto src_data = FlatVector::GetData(src); + auto dst_data = FlatVector::GetData(dst); + + ValidityMask &result_validity = FlatVector::Validity(result); + + // create result vector + result.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result); + + // create temp SIMD arrays + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + // maps lane to search number + short lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + + idx_t started_searches = 0; + while (started_searches < pairs.size()) { + + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + + // add search jobs to free lanes + uint64_t active = 0; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < pairs.size()) { + int64_t search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + result_data[search_num] = (uint64_t)-1; /* no path */ + } else if (src_data[src_pos] == dst_data[dst_pos]) { + result_data[search_num] = + (uint64_t)0; // path of length 0 does not require a search + } else { + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } + } + } + + // make passes while a lane is still active + for (int64_t iter = 1; active; iter++) { + if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } + // detect lanes that finished + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (seen[dst_data[dst_pos]][lane]) { + result_data[search_num] = + iter; /* found at iter => iter = path length */ + lane_to_num[lane] = -1; // mark inactive + active--; + } + } + } + } + + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + lane_to_num[lane] = -1; // mark inactive + } + } + } +} + + SinkFinalizeType PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, OperatorSinkFinalizeInput &input) const { auto &gstate = input.global_state.Cast(); + auto &csr = gstate.global_csr; + auto &global_tasks = gstate.global_tasks; + if (global_tasks.Count() != 0) { + DataChunk pairs; + global_tasks.InitializeScanChunk(pairs); + ColumnDataScanState scan_state; + global_tasks.InitializeScan(scan_state); + while (global_tasks.Scan(scan_state, pairs)) { + Vector result(LogicalType::BIGINT, true, true); + IterativeLengthFunction(csr, pairs, result); + // store the result + // gstate.global_results.InitializeAppend(gstate.append_state); + // gstate.global_results.Append(gstate.append_state, pairs); + // // debug print + // gstate.global_results.Print(); + } + } + + // Move to the next input child + ++gstate.child; return SinkFinalizeType::READY; } diff --git a/test/sql/path-finding/parallel_path_finding.test b/test/sql/path-finding/parallel_path_finding.test index 400e485b..378d7bab 100644 --- a/test/sql/path-finding/parallel_path_finding.test +++ b/test/sql/path-finding/parallel_path_finding.test @@ -8,7 +8,7 @@ require duckpgq statement ok -CREATE TABLE pairs(src INT, dst INT); INSERT INTO pairs(src, dst) VALUES (0, 1), (1, 2), (2,0); +CREATE TABLE pairs(src BIGINT, dst BIGINT); INSERT INTO pairs(src, dst) VALUES (0, 1), (1, 2), (2,0); statement ok create table student(id INT); INSERT INTO student(id) VALUES (10), (20), (30), (40); @@ -38,7 +38,7 @@ knows SOURCE KEY (src) REFERENCES student (id) # COLUMNS (*) # ); -statement ok +query II SELECT * FROM pairs AS p WHERE p.src BETWEEN (SELECT CREATE_CSR_EDGE( @@ -55,6 +55,7 @@ WHERE p.src BETWEEN (SELECT CREATE_CSR_EDGE( LEFT JOIN knows k ON k.src = a.id GROUP BY a.rowid) t ON t.a_rowid = a.rowid) AND p.dst; +---- # CAST (