Skip to content

Commit

Permalink
Fixed error when more than one vector worth of pairs was present
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Dec 4, 2024
1 parent 07e4bfa commit 7dbd1df
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 153 deletions.
1 change: 1 addition & 0 deletions src/core/operator/event/shortest_path_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void ShortestPathEvent::Schedule() {
void ShortestPathEvent::FinishEvent() {
// if remaining pairs, schedule the BFS for the next batch
if (gbfs_state->total_pairs_processed < gbfs_state->pairs->Count()) {
gbfs_state->Clear();
gbfs_state->ScheduleBFSEvent(*pipeline, *this, gbfs_state->op);
}
}
Expand Down
35 changes: 18 additions & 17 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ GlobalBFSState::GlobalBFSState(unique_ptr<ColumnDataCollection> &pairs_, CSR* cs
}

results = make_uniq<ColumnDataCollection>(Allocator::Get(context), types);
current_pairs_batch = make_uniq<DataChunk>();
results->InitializeScan(result_scan_state);

// Only have to initialize the the current batch and state once.
pairs->InitializeScan(scan_state);
pairs->InitializeScanChunk(scan_state, *current_pairs_batch);
pairs->InitializeScan(input_scan_state);


CreateTasks();
scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size());
Expand Down Expand Up @@ -251,9 +251,9 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
return SinkFinalizeType::READY;
}

auto global_bfs_state = make_shared_ptr<GlobalBFSState>(global_tasks, gstate.csr, gstate.csr->vsize-2,
gstate.global_bfs_state = make_shared_ptr<GlobalBFSState>(global_tasks, gstate.csr, gstate.csr->vsize-2,
gstate.num_threads, mode, context);
global_bfs_state->ScheduleBFSEvent(pipeline, event, this);
gstate.global_bfs_state->ScheduleBFSEvent(pipeline, event, this);

// Move to the next input child
++gstate.child;
Expand All @@ -262,8 +262,9 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
}

void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) {
current_pairs_batch->Reset();
pairs->Scan(scan_state, *current_pairs_batch);
current_pairs_batch = make_uniq<DataChunk>();
pairs->InitializeScanChunk(input_scan_state, *current_pairs_batch);
pairs->Scan(input_scan_state, *current_pairs_batch);
op = op_;

started_searches = 0; // reset
Expand Down Expand Up @@ -374,16 +375,16 @@ PhysicalPathFinding::GetLocalSourceState(ExecutionContext &context,
SourceResultType PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
OperatorSourceInput &input) const {
auto &pf_sink = sink_state->Cast<PathFindingGlobalSinkState>();
std::cout << "GetData" << std::endl;
//
// // If there are no pairs, we're done
// if (pf_sink.results->Count() == 0) {
// return SourceResultType::FINISHED;
// }
// ColumnDataScanState result_scan_state;
// pf_sink.results->Scan(result_scan_state, result);

return SourceResultType::FINISHED;

// If there are no pairs, we're done
if (pf_sink.global_bfs_state->results->Count() == 0) {
return SourceResultType::FINISHED;
}
pf_sink.global_bfs_state->results->Scan(pf_sink.global_bfs_state->result_scan_state, result);
if (pf_sink.global_bfs_state->result_scan_state.current_row_index == pf_sink.global_bfs_state->results->Count()) {
return SourceResultType::FINISHED;
}
return SourceResultType::HAVE_MORE_OUTPUT;
}

//===--------------------------------------------------------------------===//
Expand Down
8 changes: 4 additions & 4 deletions src/core/operator/task/shortest_path_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ void ShortestPathTask::PathConstruction() {
path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)});
auto result_data = FlatVector::GetData<list_entry_t>(path_finding_result->data[0]);
auto &result_validity = FlatVector::Validity(path_finding_result->data[0]);
size_t list_len = 0;
//! Reconstruct the paths
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = state->lane_to_num[lane];
Expand All @@ -182,7 +183,7 @@ void ShortestPathTask::PathConstruction() {
ListVector::GetListSize(*output));
result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = state->total_len;
state->total_len += result_data[search_num].length;
list_len += result_data[search_num].length;
continue;
}
std::vector<int64_t> output_vector;
Expand Down Expand Up @@ -220,17 +221,16 @@ void ShortestPathTask::PathConstruction() {
}

result_data[search_num].length = ListVector::GetListSize(*output);
result_data[search_num].offset = state->total_len;
result_data[search_num].offset = list_len;
ListVector::Append(path_finding_result->data[0], ListVector::GetEntry(*output),
ListVector::GetListSize(*output));
state->total_len += result_data[search_num].length;
list_len += result_data[search_num].length;
}
path_finding_result->SetCardinality(state->current_pairs_batch->size());

state->current_pairs_batch->Fuse(*path_finding_result);
state->current_pairs_batch->Print();
state->results->Append(*state->current_pairs_batch);

state->results->Print();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
namespace duckpgq {

namespace core {
class GlobalBFSState;

class PhysicalPathFinding : public PhysicalComparisonJoin {
#define LANE_LIMIT 512
Expand All @@ -29,7 +30,7 @@ class PhysicalPathFinding : public PhysicalComparisonJoin {
unique_ptr<PhysicalOperator> pairs,
unique_ptr<PhysicalOperator> csr);

static constexpr PhysicalOperatorType TYPE =
static constexpr PhysicalOperatorType TYPE =
PhysicalOperatorType::EXTENSION;
vector<unique_ptr<Expression>> expressions;
string mode; // "iterativelength" or "shortestpath"
Expand Down Expand Up @@ -120,7 +121,8 @@ class GlobalBFSState : public enable_shared_from_this<GlobalBFSState> {
int64_t lane_to_num[LANE_LIMIT];
idx_t active = 0;
unique_ptr<ColumnDataCollection> results; // results of (src, dst, path-finding)
ColumnDataScanState scan_state;
ColumnDataScanState result_scan_state;
ColumnDataScanState input_scan_state;
ColumnDataAppendState append_state;
ClientContext &context;
vector<std::bitset<LANE_LIMIT>> seen;
Expand Down Expand Up @@ -158,6 +160,7 @@ class PathFindingGlobalSinkState : public GlobalSinkState {
// pairs is a 2-column table with src and dst
unique_ptr<ColumnDataCollection> global_pairs;
unique_ptr<ColumnDataCollection> global_csr_column_data;
shared_ptr<GlobalBFSState> global_bfs_state;
CSR* csr;
int32_t csr_id;
size_t child;
Expand Down
87 changes: 87 additions & 0 deletions test/sql/path_finding/path_finding_small_graph.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@


require duckpgq

statement ok
set experimental_path_finding_operator=true;

statement ok
CREATE TABLE person(id BIGINT);

statement ok
CREATE TABLE person_knows_person(person1id BIGINT, person2id BIGINT);

statement ok
INSERT INTO person(id) VALUES
(0), (1), (2), (3), (4), (5), (6), (7), (8), (9);

statement ok
INSERT INTO person_knows_person(person1id, person2id) VALUES
(0, 1), (1, 2), (2, 3), (3, 4), (4, 5),
(5, 6), (6, 7), (7, 8), (8, 9), (9, 0), -- Circular connections
(0, 2), (1, 3), (2, 4), (3, 5), (4, 6),
(5, 7), (6, 8), (7, 9), (8, 0), (9, 1); -- Additional connections

statement ok
CREATE TABLE pairs(src BIGINT, dst BIGINT);

statement ok
INSERT INTO pairs(src, dst) VALUES
(0,2), (2,4), (4,6), (6,8), (8,0),
(1,3), (3,5), (5,7), (7,9), (9,1), -- Existing connections
(0,3), (1,4), (2,5), (3,6), (4,7), -- Additional connections
(5,8), (6,9), (7,0), (8,1), (9,2), -- Wrap-around connections
(0,4), (1,5), (2,6), (3,7), (4,8); -- Further extended connections

query III
with csr_cte as (
SELECT cast(min(create_csr_edge(
0,
(SELECT count(a.id) FROM person a),
CAST (
(SELECT sum(create_csr_vertex(
0,
(SELECT count(a.id) FROM person a),
sub.dense_id,
sub.cnt))
FROM (
SELECT a.rowid as dense_id, count(k.person1id) as cnt
FROM person a
LEFT JOIN person_knows_person k ON k.person1id = a.id
GROUP BY a.rowid) sub
)
AS BIGINT),
(select count() FROM person_knows_person k JOIN person a on a.id = k.person1id JOIN person c on c.id = k.person2id),
a.rowid,
c.rowid,
k.rowid)) as bigint) as csr_id
FROM person_knows_person k
JOIN person a on a.id = k.person1id
JOIN person c on c.id = k.person2id)
SELECT src, dst, shortestpathoperator(src, dst, csr_id) FROM pairs, csr_cte;
----
0 2 [0, 10, 2]
2 4 [2, 12, 4]
4 6 [4, 14, 6]
6 8 [6, 16, 8]
8 0 [8, 18, 0]
1 3 [1, 11, 3]
3 5 [3, 13, 5]
5 7 [5, 15, 7]
7 9 [7, 17, 9]
9 1 [9, 19, 1]
0 3 [0, 0, 1, 11, 3]
1 4 [1, 1, 2, 12, 4]
2 5 [2, 2, 3, 13, 5]
3 6 [3, 3, 4, 14, 6]
4 7 [4, 4, 5, 15, 7]
5 8 [5, 5, 6, 16, 8]
6 9 [6, 6, 7, 17, 9]
7 0 [7, 7, 8, 18, 0]
8 1 [8, 18, 0, 0, 1]
9 2 [9, 9, 0, 10, 2]
0 4 [0, 10, 2, 12, 4]
1 5 [1, 11, 3, 13, 5]
2 6 [2, 12, 4, 14, 6]
3 7 [3, 13, 5, 15, 7]
4 8 [4, 14, 6, 16, 8]
Loading

0 comments on commit 7dbd1df

Please sign in to comment.