Skip to content

Commit

Permalink
Appending datachunk after every batch of pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
Dtenwolde committed Dec 3, 2024
1 parent a13b54e commit 07e4bfa
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 150 deletions.
15 changes: 10 additions & 5 deletions src/core/operator/event/shortest_path_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@
namespace duckpgq {
namespace core {

ShortestPathEvent::ShortestPathEvent(GlobalBFSState &gbfs_state_p,
ShortestPathEvent::ShortestPathEvent(shared_ptr<GlobalBFSState> gbfs_state_p,
Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
: BasePipelineEvent(pipeline_p), gbfs_state(gbfs_state_p), op(op_p) {
: BasePipelineEvent(pipeline_p), gbfs_state(std::move(gbfs_state_p)), op(op_p) {

}

void ShortestPathEvent::Schedule() {
auto &context = pipeline->GetClientContext();

std::cout << gbfs_state->csr->ToString();
vector<shared_ptr<Task>> bfs_tasks;
size_t threads_to_schedule = std::min(gbfs_state.num_threads, (idx_t)gbfs_state.global_task_queue.size());
for (idx_t tnum = 0; tnum < threads_to_schedule; tnum++) {
for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) {
bfs_tasks.push_back(make_uniq<ShortestPathTask>(
shared_from_this(), context, gbfs_state, tnum, op));
}
SetTasks(std::move(bfs_tasks));
}

void ShortestPathEvent::FinishEvent() {
// if remaining pairs, schedule the BFS for the next batch
if (gbfs_state->total_pairs_processed < gbfs_state->pairs->Count()) {
gbfs_state->ScheduleBFSEvent(*pipeline, *this, gbfs_state->op);
}
}

} // namespace core
} // namespace duckpgq
121 changes: 50 additions & 71 deletions src/core/operator/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,32 @@ PhysicalPathFinding::PhysicalPathFinding(LogicalExtensionOperator &op,
mode = path_finding_op.mode;
}

GlobalBFSState::GlobalBFSState(DataChunk &pairs_, CSR* csr_, int64_t vsize_,
GlobalBFSState::GlobalBFSState(unique_ptr<ColumnDataCollection> &pairs_, CSR* csr_, int64_t vsize_,
idx_t num_threads_, string mode_, ClientContext &context_)
: pairs(pairs_), iter(1), csr(csr_), v_size(vsize_), change(false),
started_searches(0), total_len(0), context(context_), seen(vsize_),
visit1(vsize_), visit2(vsize_), num_threads(num_threads_),
element_locks(vsize_),
mode(std::move(mode_)), parents_ve(vsize_) {
if (mode == "iterativelength") { // length
result.Initialize(
context, {LogicalType::BIGINT},
pairs_.size());
} else if (mode == "shortestpath") { // path
result.Initialize(
context, {LogicalType::LIST(LogicalType::BIGINT)},
pairs_.size());
auto types = pairs->Types();
if (mode == "iterativelength") {
types.push_back(LogicalType::BIGINT);
} else if (mode == "shortestpath") {
types.push_back(LogicalType::LIST(LogicalType::BIGINT));
} else {
throw NotImplementedException("Mode not supported");
}
result.SetCardinality(pairs_.size());

auto &src_data = pairs.data[0];
auto &dst_data = pairs.data[1];
src_data.ToUnifiedFormat(pairs.size(), vdata_src);
dst_data.ToUnifiedFormat(pairs.size(), vdata_dst);
src = FlatVector::GetData<int64_t>(src_data);
dst = FlatVector::GetData<int64_t>(dst_data);
result.SetCapacity(pairs.size());
results = make_uniq<ColumnDataCollection>(Allocator::Get(context), types);
current_pairs_batch = make_uniq<DataChunk>();

// Only have to initialize the the current batch and state once.
pairs->InitializeScan(scan_state);
pairs->InitializeScanChunk(scan_state, *current_pairs_batch);

CreateTasks();
size_t number_of_threads_to_schedule = std::min(num_threads, (idx_t)global_task_queue.size());
barrier = make_uniq<Barrier>(number_of_threads_to_schedule);
scheduled_threads = std::min(num_threads, (idx_t)global_task_queue.size());
barrier = make_uniq<Barrier>(scheduled_threads);
}

void GlobalBFSState::Clear() {
Expand Down Expand Up @@ -107,7 +102,7 @@ void GlobalBFSState::CreateTasks() {
if (current_task_start < (idx_t)v_size) {
global_task_queue.push_back({current_task_start, v_size});
}
// std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl;
std::cout << "Set the number of tasks to " << global_task_queue.size() << std::endl;
}


Expand Down Expand Up @@ -182,16 +177,8 @@ PathFindingGlobalSinkState::PathFindingGlobalSinkState(ClientContext &context,
global_csr_column_data =
make_uniq<ColumnDataCollection>(context, op.children[1]->GetTypes());

auto result_types = op.children[0]->GetTypes();
if (op.mode == "iterativelength") {
result_types.push_back(LogicalType::BIGINT);
} else {
result_types.push_back(LogicalType::LIST(LogicalType::BIGINT));
}
results = make_uniq<ColumnDataCollection>(context, result_types);
child = 0;
mode = op.mode;
pairs_processed = 0;
auto &scheduler = TaskScheduler::GetScheduler(context);
num_threads = scheduler.NumberOfThreads();
}
Expand Down Expand Up @@ -260,41 +247,44 @@ PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
++gstate.child;
return SinkFinalizeType::READY;
}

while (gstate.pairs_processed < global_tasks->Count()) {
DataChunk pairs_to_process;
ColumnDataScanState scan_state;
global_tasks->Scan(scan_state, pairs_to_process);

auto global_bfs_state = make_uniq<GlobalBFSState>(pairs_to_process, gstate.csr, gstate.csr->vsize-2,
gstate.num_threads, mode, context);
global_bfs_state->ScheduleBFSEvent(pipeline, event, gstate, this);
ColumnDataAppendState append_state;
// TODO add back in option to set the task size
gstate.pairs_processed += pairs_to_process.size();
gstate.results->Append(append_state, global_bfs_state->result);
std::cout << "Processed " << gstate.pairs_processed << " pairs" << std::endl;
std::cout << "Total size: " << global_tasks->Count() << std::endl;
if (global_tasks->Count() == 0) {
return SinkFinalizeType::READY;
}

auto global_bfs_state = make_shared_ptr<GlobalBFSState>(global_tasks, gstate.csr, gstate.csr->vsize-2,
gstate.num_threads, mode, context);
global_bfs_state->ScheduleBFSEvent(pipeline, event, this);

// Move to the next input child
++gstate.child;
duckpgq_state->csr_to_delete.insert(gstate.csr_id);
return SinkFinalizeType::READY;
}

void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event,
GlobalSinkState &state, const PhysicalPathFinding *op) {
auto &gstate = state.Cast<PathFindingGlobalSinkState>();
// remaining pairs
if (started_searches < pairs.size()) {
auto &result_validity = FlatVector::Validity(result.data[0]);
void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op_) {
current_pairs_batch->Reset();
pairs->Scan(scan_state, *current_pairs_batch);
op = op_;

started_searches = 0; // reset
active = 0;

auto &src_data = current_pairs_batch->data[0];
auto &dst_data = current_pairs_batch->data[1];
src_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_src);
dst_data.ToUnifiedFormat(current_pairs_batch->size(), vdata_dst);
src = FlatVector::GetData<int64_t>(src_data);
dst = FlatVector::GetData<int64_t>(dst_data);

// remaining pairs for current batch
if (started_searches < current_pairs_batch->size()) {
auto &result_validity = FlatVector::Validity(current_pairs_batch->data[0]);
std::bitset<LANE_LIMIT> seen_mask;
seen_mask.set();

for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < pairs.size()) {
while (started_searches < current_pairs_batch->size()) {
auto search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
Expand All @@ -317,9 +307,9 @@ void GlobalBFSState::ScheduleBFSEvent(Pipeline &pipeline, Event &event,
throw NotImplementedException("Iterative length has not been implemented yet");
// event.InsertEvent(
// make_shared_ptr<ParallelIterativeEvent>(gstate, pipeline, *this));
} else if (gstate.mode == "shortestpath") {
} else if (mode == "shortestpath") {
event.InsertEvent(
make_shared_ptr<ShortestPathEvent>(*this, pipeline, op));
make_shared_ptr<ShortestPathEvent>(shared_from_this(), pipeline, *op));
} else {
throw NotImplementedException("Mode not supported");
}
Expand Down Expand Up @@ -360,22 +350,10 @@ class PathFindingGlobalSourceState : public GlobalSourceState {
explicit PathFindingGlobalSourceState(const PhysicalPathFinding &op)
: op(op), initialized(false) {}

void Initialize(PathFindingGlobalState &sink_state) {
lock_guard<mutex> initializing(lock);
if (initialized) {
return;
}
initialized = true;
}

public:
idx_t MaxThreads() override {
return 1;
}

void GetNextPair(ClientContext &client, PathFindingGlobalState &gstate,
PathFindingLocalSourceState &lstate) {}

const PhysicalPathFinding &op;

mutex lock;
Expand All @@ -396,13 +374,14 @@ PhysicalPathFinding::GetLocalSourceState(ExecutionContext &context,
SourceResultType PhysicalPathFinding::GetData(ExecutionContext &context, DataChunk &result,
OperatorSourceInput &input) const {
auto &pf_sink = sink_state->Cast<PathFindingGlobalSinkState>();

// If there are no pairs, we're done
if (pf_sink.results->Count() == 0) {
return SourceResultType::FINISHED;
}
ColumnDataScanState result_scan_state;
pf_sink.results->Scan(result_scan_state, result);
std::cout << "GetData" << std::endl;
//
// // If there are no pairs, we're done
// if (pf_sink.results->Count() == 0) {
// return SourceResultType::FINISHED;
// }
// ColumnDataScanState result_scan_state;
// pf_sink.results->Scan(result_scan_state, result);

return SourceResultType::FINISHED;
}
Expand Down
Loading

0 comments on commit 07e4bfa

Please sign in to comment.