Skip to content

Commit

Permalink
Move leftover methods to bfs_state.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Dec 12, 2024
1 parent f75a9f4 commit 76352b0
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 80 deletions.
83 changes: 83 additions & 0 deletions src/core/operator/bfs_state.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckpgq/core/operator/bfs_state.hpp"

#include <duckpgq/core/operator/event/shortest_path_event.hpp>
#include <duckpgq/core/utils/compressed_sparse_row.hpp>
#include <duckpgq/core/utils/duckpgq_barrier.hpp>
#include <duckpgq/core/utils/duckpgq_utils.hpp>
Expand Down Expand Up @@ -127,6 +128,88 @@ pair<idx_t, idx_t> GlobalBFSState::BoundaryCalculation(idx_t worker_id) const {
return {left, right};
}

void GlobalBFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event) {
std::cout << "Starting BFS for " << current_pairs_batch->size() << " pairs with " << LANE_LIMIT << " batch size." << std::endl;
std::cout << "Number of started searches: " << started_searches << std::endl;
auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]);
std::bitset<LANE_LIMIT> seen_mask;
seen_mask.set();

for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < current_pairs_batch->size()) {
auto search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
} else {
visit1[src[src_pos]][lane] = true;
// bfs_state->seen[bfs_state->src[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
seen_mask[lane] = false;
break;
}
}
}
for (int64_t i = 0; i < v_size; i++) {
seen[i] = seen_mask;
}

if (mode == "iterativelength") {
throw NotImplementedException("Iterative length has not been implemented yet");
// event.InsertEvent(
// make_shared_ptr<ParallelIterativeEvent>(gstate, pipeline, *this));
} else if (mode == "shortestpath") {
std::cout << "Scheduling shortest path event" << std::endl;
event.InsertEvent(
make_shared_ptr<ShortestPathEvent>(shared_from_this(), pipeline, *op));
} else {
throw NotImplementedException("Mode not supported");
}
}


void GlobalBFSState::InitializeBFS(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) {
current_pairs_batch = make_uniq<DataChunk>();
pairs->InitializeScanChunk(input_scan_state, *current_pairs_batch);
pairs->Scan(input_scan_state, *current_pairs_batch);
op = op_;

path_finding_result = make_uniq<DataChunk>();
path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)});
current_batch_path_list_len = 0;

started_searches = 0; // reset
active = 0;

auto &src_data = current_pairs_batch->data[0];
auto &dst_data = current_pairs_batch->data[1];
src_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_src);
dst_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_dst);
src = FlatVector::GetData<int64_t>(src_data);
dst = FlatVector::GetData<int64_t>(dst_data);

// remaining pairs for current batch
while (started_searches < current_pairs_batch->size()) {
ScheduleBFSBatch(pipeline, event);
Clear();
}
if (started_searches != current_pairs_batch->size()) {
throw InternalException("Number of started searches does not match the number of pairs");
}
// path_finding_result->SetCardinality(current_pairs_batch->size());
// path_finding_result->Print();
// current_pairs_batch->Fuse(*path_finding_result);
// current_pairs_batch->Print();
// results->Append(*current_pairs_batch);
// total_pairs_processed += current_pairs_batch->size();
// std::cout << "Total pairs processed: " << total_pairs_processed << std::endl;
// if (total_pairs_processed < pairs->Count()) {
// InitializeBFS(pipeline, event, op);
// }
}

} // namespace core

} // namespace duckpgq
81 changes: 1 addition & 80 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void PathFindingLocalSinkState::Sink(DataChunk &input, idx_t child) {
if (child == 1) {
// Add the tasks (src, dst) to sink
// Optimizations: Eliminate duplicate sources/destinations
// - For example: group by source, list(destination)
local_pairs.Append(input);
}
}
Expand Down Expand Up @@ -141,87 +142,7 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
return SinkFinalizeType::READY;
}

void GlobalBFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event) {
std::cout << "Starting BFS for " << current_pairs_batch->size() << " pairs with " << LANE_LIMIT << " batch size." << std::endl;
std::cout << "Number of started searches: " << started_searches << std::endl;
auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]);
std::bitset<LANE_LIMIT> seen_mask;
seen_mask.set();

for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < current_pairs_batch->size()) {
auto search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
} else {
visit1[src[src_pos]][lane] = true;
// bfs_state->seen[bfs_state->src[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
seen_mask[lane] = false;
break;
}
}
}
for (int64_t i = 0; i < v_size; i++) {
seen[i] = seen_mask;
}

if (mode == "iterativelength") {
throw NotImplementedException("Iterative length has not been implemented yet");
// event.InsertEvent(
// make_shared_ptr<ParallelIterativeEvent>(gstate, pipeline, *this));
} else if (mode == "shortestpath") {
std::cout << "Scheduling shortest path event" << std::endl;
event.InsertEvent(
make_shared_ptr<ShortestPathEvent>(shared_from_this(), pipeline, *op));
} else {
throw NotImplementedException("Mode not supported");
}
}


void GlobalBFSState::InitializeBFS(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) {
current_pairs_batch = make_uniq<DataChunk>();
pairs->InitializeScanChunk(input_scan_state, *current_pairs_batch);
pairs->Scan(input_scan_state, *current_pairs_batch);
op = op_;

path_finding_result = make_uniq<DataChunk>();
path_finding_result->Initialize(context, {LogicalType::LIST(LogicalType::BIGINT)});
current_batch_path_list_len = 0;

started_searches = 0; // reset
active = 0;

auto &src_data = current_pairs_batch->data[0];
auto &dst_data = current_pairs_batch->data[1];
src_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_src);
dst_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_dst);
src = FlatVector::GetData<int64_t>(src_data);
dst = FlatVector::GetData<int64_t>(dst_data);

// remaining pairs for current batch
while (started_searches < current_pairs_batch->size()) {
ScheduleBFSBatch(pipeline, event);
Clear();
}
if (started_searches != current_pairs_batch->size()) {
throw InternalException("Number of started searches does not match the number of pairs");
}
// path_finding_result->SetCardinality(current_pairs_batch->size());
// path_finding_result->Print();
// current_pairs_batch->Fuse(*path_finding_result);
// current_pairs_batch->Print();
// results->Append(*current_pairs_batch);
// total_pairs_processed += current_pairs_batch->size();
// std::cout << "Total pairs processed: " << total_pairs_processed << std::endl;
// if (total_pairs_processed < pairs->Count()) {
// InitializeBFS(pipeline, event, op);
// }
}

InsertionOrderPreservingMap<string> PhysicalPathFinding::ParamsToString() const {
InsertionOrderPreservingMap<string> result;
Expand Down

0 comments on commit 76352b0

Please sign in to comment.