diff --git a/duckpgq/include/duckpgq/duckpgq_functions.hpp b/duckpgq/include/duckpgq/duckpgq_functions.hpp index 0aaaa557..6b789daf 100644 --- a/duckpgq/include/duckpgq/duckpgq_functions.hpp +++ b/duckpgq/include/duckpgq/duckpgq_functions.hpp @@ -27,8 +27,10 @@ class DuckPGQFunctions { functions.push_back(GetCsrEdgeFunction()); functions.push_back(GetCheapestPathLengthFunction()); functions.push_back(GetShortestPathFunction()); + functions.push_back(GetShortestPathLowerBoundFunction()); functions.push_back(GetReachabilityFunction()); functions.push_back(GetIterativeLengthFunction()); + functions.push_back(GetIterativeLengthLowerBoundFunction()); functions.push_back(GetIterativeLengthBidirectionalFunction()); functions.push_back(GetIterativeLength2Function()); functions.push_back(GetDeleteCsrFunction()); @@ -57,8 +59,10 @@ class DuckPGQFunctions { static CreateScalarFunctionInfo GetCsrEdgeFunction(); static CreateScalarFunctionInfo GetCheapestPathLengthFunction(); static CreateScalarFunctionInfo GetShortestPathFunction(); + static CreateScalarFunctionInfo GetShortestPathLowerBoundFunction(); static CreateScalarFunctionInfo GetReachabilityFunction(); static CreateScalarFunctionInfo GetIterativeLengthFunction(); + static CreateScalarFunctionInfo GetIterativeLengthLowerBoundFunction(); static CreateScalarFunctionInfo GetIterativeLengthBidirectionalFunction(); static CreateScalarFunctionInfo GetIterativeLength2Function(); static CreateScalarFunctionInfo GetDeleteCsrFunction(); diff --git a/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt b/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt index 2a35040f..44c3eea2 100644 --- a/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt +++ b/duckpgq/src/duckpgq/functions/scalar/CMakeLists.txt @@ -5,10 +5,12 @@ set(EXTENSION_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/csr_deletion.cpp ${CMAKE_CURRENT_SOURCE_DIR}/csr_get_w_type.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength_lowerbound.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength2.cpp ${CMAKE_CURRENT_SOURCE_DIR}/iterativelength_bidirectional.cpp ${CMAKE_CURRENT_SOURCE_DIR}/reachability.cpp ${CMAKE_CURRENT_SOURCE_DIR}/shortest_path.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/shortest_path_lowerbound.cpp ${CMAKE_CURRENT_SOURCE_DIR}/csr_creation.cpp PARENT_SCOPE ) \ No newline at end of file diff --git a/duckpgq/src/duckpgq/functions/scalar/iterativelength.cpp b/duckpgq/src/duckpgq/functions/scalar/iterativelength.cpp index 61f56808..7303ec22 100644 --- a/duckpgq/src/duckpgq/functions/scalar/iterativelength.cpp +++ b/duckpgq/src/duckpgq/functions/scalar/iterativelength.cpp @@ -8,7 +8,6 @@ namespace duckdb { static bool IterativeLength(int64_t v_size, int64_t *v, vector &e, - vector>> &parents_v, vector> &seen, vector> &visit, vector> &next) { @@ -16,28 +15,19 @@ static bool IterativeLength(int64_t v_size, int64_t *v, vector &e, for (auto i = 0; i < v_size; i++) { next[i] = 0; } - - for (auto lane = 0; lane < LANE_LIMIT; lane++) { - for (auto i = 0; i < v_size; i++) { - if (visit[i][lane]) { - for (auto offset = v[i]; offset < v[i + 1]; offset++) { - auto n = e[offset]; - if (seen[n][lane] == false || parents_v[i][lane].find(n) == parents_v[i][lane].end()) { - parents_v[n][lane] = parents_v[i][lane]; - parents_v[n][lane].insert(i); - next[n][lane] = true; - } - } + for (auto i = 0; i < v_size; i++) { + if (visit[i].any()) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + next[n] = next[n] | visit[i]; } } } - for (auto i = 0; i < v_size; i++) { - // next[i] = next[i] & ~seen[i]; + next[i] = next[i] & ~seen[i]; seen[i] = seen[i] | next[i]; change |= next[i].any(); } - return change; } @@ -85,14 +75,10 @@ static void IterativeLengthFunction(DataChunk &args, ExpressionState &state, auto dst_data = (int64_t *)vdata_dst.data; // get lowerbound and upperbound - auto &lower_bound = args.data[4]; - auto &upper_bound = args.data[5]; - UnifiedVectorFormat vdata_lower_bound; + auto &upper = args.data[5]; UnifiedVectorFormat vdata_upper_bound; - lower_bound.ToUnifiedFormat(args.size(), vdata_lower_bound); - upper_bound.ToUnifiedFormat(args.size(), vdata_upper_bound); - auto lower_bound_data = (int64_t *)vdata_lower_bound.data; - auto upper_bound_data = (int64_t *)vdata_upper_bound.data; + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; ValidityMask &result_validity = FlatVector::Validity(result); @@ -104,7 +90,6 @@ static void IterativeLengthFunction(DataChunk &args, ExpressionState &state, vector> seen(v_size); vector> visit1(v_size); vector> visit2(v_size); - vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); // maps lane to search number short lane_to_num[LANE_LIMIT]; @@ -128,21 +113,11 @@ static void IterativeLengthFunction(DataChunk &args, ExpressionState &state, while (started_searches < args.size()) { int64_t search_num = started_searches++; int64_t src_pos = vdata_src.sel->get_index(search_num); - int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (!vdata_src.validity.RowIsValid(src_pos)) { result_validity.SetInvalid(search_num); result_data[search_num] = (int64_t)-1; /* no path */ - } else if (src_data[src_pos] == dst_data[dst_pos]) { - // result_data[search_num] = - // (int64_t)0; // path of length 0 does not require a search - result_data[search_num] = (int64_t)-1; /* no path */ - visit1[src_data[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - break; } else { result_data[search_num] = (int64_t)-1; /* initialize to no path */ - seen[src_data[src_pos]][lane] = true; visit1[src_data[src_pos]][lane] = true; lane_to_num[lane] = search_num; // active lane active++; @@ -152,46 +127,24 @@ static void IterativeLengthFunction(DataChunk &args, ExpressionState &state, } // make passes while a lane is still active - for (int64_t iter = 1; active; iter++) { - // if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, - // (iter & 1) ? visit2 : visit1)) { - // break; - // } - bool stop = !IterativeLength(v_size, v, e, parents_v, seen, (iter & 1) ? visit1 : visit2, - (iter & 1) ? visit2 : visit1); + for (int64_t iter = 1; active && iter <= upper_bound; iter++) { + if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } // detect lanes that finished for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = lane_to_num[lane]; if (search_num >= 0) { // active lane int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (seen[dst_data[dst_pos]][lane]){ - - // check if the path length is within bounds - // bound vector is either a constant or a flat vector - if (lower_bound.GetVectorType() == VectorType::CONSTANT_VECTOR ? - iter < lower_bound_data[0] : iter < lower_bound_data[dst_pos]) { - // when reach the destination too early, treat destination as null - // looks like the graph does not have that vertex - seen[dst_data[dst_pos]][lane] = false; - (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false - : visit1[dst_data[dst_pos]][lane] = false; - continue; - } else if (upper_bound.GetVectorType() == VectorType::CONSTANT_VECTOR ? - iter > upper_bound_data[0] : iter > upper_bound_data[dst_pos]) { - result_validity.SetInvalid(search_num); - result_data[search_num] = (int64_t)-1; /* no path */ - } else { - result_data[search_num] = - iter; /* found at iter => iter = path length */ - } + result_data[search_num] = + iter; /* found at iter => iter = path length */ lane_to_num[lane] = -1; // mark inactive active--; } } } - if (stop) { - break; - } } // no changes anymore: any still active searches have no path diff --git a/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp b/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp new file mode 100644 index 00000000..1fe6d52d --- /dev/null +++ b/duckpgq/src/duckpgq/functions/scalar/iterativelength_lowerbound.cpp @@ -0,0 +1,209 @@ +#include +#include "duckdb/main/client_data.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckpgq/common.hpp" +#include "duckpgq/duckpgq_functions.hpp" + +namespace duckdb { + +static bool IterativeLengthLowerBound(int64_t v_size, int64_t *v, vector &e, + vector>> &parents_v, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + for (auto i = 0; i < v_size; i++) { + next[i] = 0; + } + + for (auto lane = 0; lane < LANE_LIMIT; lane++) { + for (auto i = 0; i < v_size; i++) { + if (visit[i][lane]) { + for (auto offset = v[i]; offset < v[i + 1]; offset++) { + auto n = e[offset]; + if (seen[n][lane] == false || parents_v[i][lane].find(n) == parents_v[i][lane].end()) { + parents_v[n][lane] = parents_v[i][lane]; + parents_v[n][lane].insert(i); + next[n][lane] = true; + } + } + } + } + } + + for (auto i = 0; i < v_size; i++) { + seen[i] = seen[i] | next[i]; + change |= next[i].any(); + } + + return change; +} + +static void IterativeLengthLowerBoundFunction(DataChunk &args, ExpressionState &state, + Vector &result) { + auto &func_expr = (BoundFunctionExpression &)state.expr; + auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info; + auto duckpgq_state_entry = info.context.registered_state.find("duckpgq"); + if (duckpgq_state_entry == info.context.registered_state.end()) { + //! Wondering how you can get here if the extension wasn't loaded, but + //! leaving this check in anyways + throw MissingExtensionException( + "The DuckPGQ extension has not been loaded"); + } + auto duckpgq_state = + reinterpret_cast(duckpgq_state_entry->second.get()); + + D_ASSERT(duckpgq_state->csr_list[info.csr_id]); + + if ((uint64_t)info.csr_id + 1 > duckpgq_state->csr_list.size()) { + throw ConstraintException("Invalid ID"); + } + auto csr_entry = duckpgq_state->csr_list.find((uint64_t)info.csr_id); + if (csr_entry == duckpgq_state->csr_list.end()) { + throw ConstraintException( + "Need to initialize CSR before doing shortest path"); + } + + if (!(csr_entry->second->initialized_v && csr_entry->second->initialized_e)) { + throw ConstraintException( + "Need to initialize CSR before doing shortest path"); + } + int64_t v_size = args.data[1].GetValue(0).GetValue(); + int64_t *v = (int64_t *)duckpgq_state->csr_list[info.csr_id]->v; + vector &e = duckpgq_state->csr_list[info.csr_id]->e; + + // get src and dst vectors for searches + auto &src = args.data[2]; + auto &dst = args.data[3]; + UnifiedVectorFormat vdata_src; + UnifiedVectorFormat vdata_dst; + src.ToUnifiedFormat(args.size(), vdata_src); + dst.ToUnifiedFormat(args.size(), vdata_dst); + auto src_data = (int64_t *)vdata_src.data; + auto dst_data = (int64_t *)vdata_dst.data; + + // get lowerbound and upperbound + auto &lower = args.data[4]; + auto &upper = args.data[5]; + UnifiedVectorFormat vdata_lower_bound; + UnifiedVectorFormat vdata_upper_bound; + lower.ToUnifiedFormat(args.size(), vdata_lower_bound); + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto lower_bound = ((int64_t *)vdata_lower_bound.data)[0]; + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; + + ValidityMask &result_validity = FlatVector::Validity(result); + + // create result vector + result.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result); + + // create temp SIMD arrays + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); + + // maps lane to search number + short lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + + idx_t started_searches = 0; + while (started_searches < args.size()) { + + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + + // add search jobs to free lanes + uint64_t active = 0; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < args.size()) { + int64_t search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + } else if (src_data[src_pos] == dst_data[dst_pos]) { + result_data[search_num] = (int64_t)-1; /* no path */ + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } else { + result_data[search_num] = (int64_t)-1; /* initialize to no path */ + seen[src_data[src_pos]][lane] = true; + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } + } + } + + // make passes while a lane is still active + for (int64_t iter = 1; active && iter <= upper_bound; iter++) { + bool stop = !IterativeLengthLowerBound(v_size, v, e, parents_v, seen, (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1); + // detect lanes that finished + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (seen[dst_data[dst_pos]][lane]){ + + // check if the path length is within bounds + // bound vector is either a constant or a flat vector + if (iter < lower_bound) { + // when reach the destination too early, treat destination as null + // looks like the graph does not have that vertex + seen[dst_data[dst_pos]][lane] = false; + (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false + : visit1[dst_data[dst_pos]][lane] = false; + continue; + } else { + result_data[search_num] = + iter; /* found at iter => iter = path length */ + lane_to_num[lane] = -1; // mark inactive + active--; + } + + } + } + } + if (stop) { + break; + } + } + + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + result_data[search_num] = (int64_t)-1; /* no path */ + lane_to_num[lane] = -1; // mark inactive + } + } + } + duckpgq_state->csr_to_delete.insert(info.csr_id); +} + +CreateScalarFunctionInfo DuckPGQFunctions::GetIterativeLengthLowerBoundFunction() { + auto fun = ScalarFunction("iterativelength_lowerbound", + {LogicalType::INTEGER, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT}, + LogicalType::BIGINT, IterativeLengthLowerBoundFunction, + IterativeLengthFunctionData::IterativeLengthBind); + return CreateScalarFunctionInfo(fun); +} + +} // namespace duckdb diff --git a/duckpgq/src/duckpgq/functions/scalar/shortest_path.cpp b/duckpgq/src/duckpgq/functions/scalar/shortest_path.cpp index aca4bdab..fd68ca52 100644 --- a/duckpgq/src/duckpgq/functions/scalar/shortest_path.cpp +++ b/duckpgq/src/duckpgq/functions/scalar/shortest_path.cpp @@ -13,16 +13,12 @@ namespace duckdb { static bool IterativeLength(int64_t v_size, int64_t *V, vector &E, vector &edge_ids, - vector>> &parents_v, - vector>> &paths_v, - vector>> &paths_e, + vector> &parents_v, + vector> &parents_e, vector> &seen, vector> &visit, vector> &next) { bool change = false; - map, unordered_set> parents_v_cache; - map, vector> paths_v_cache; - map, vector> paths_e_cache; for (auto v = 0; v < v_size; v++) { next[v] = 0; } @@ -32,46 +28,20 @@ static bool IterativeLength(int64_t v_size, int64_t *V, vector &E, for (auto e = V[v]; e < V[v + 1]; e++) { auto n = E[e]; auto edge_id = edge_ids[e]; - - for (auto lane = 0; lane < LANE_LIMIT; lane++) { - if (visit[v][lane]) { - //! If the node has not been visited, then update the parent and edge - if (seen[n][lane] == false || parents_v[v][lane].find(n) == parents_v[v][lane].end()) { - if (visit[n][lane]) { - parents_v_cache[make_pair(n, lane)] = parents_v[v][lane]; - parents_v_cache[make_pair(n, lane)].insert(v); - paths_v_cache[make_pair(n, lane)] = paths_v[v][lane]; - paths_v_cache[make_pair(n, lane)].push_back(v); - paths_e_cache[make_pair(n, lane)] = paths_e[v][lane]; - paths_e_cache[make_pair(n, lane)].push_back(edge_id); - } else { - parents_v[n][lane] = parents_v[v][lane]; - parents_v[n][lane].insert(v); - paths_v[n][lane] = paths_v[v][lane]; - paths_v[n][lane].push_back(v); - paths_e[n][lane] = paths_e[v][lane]; - paths_e[n][lane].push_back(edge_id); - } - next[n][lane] = true; - } - } + next[n] = next[n] | visit[v]; + for (auto l = 0; l < LANE_LIMIT; l++) { + parents_v[n][l] = + ((parents_v[n][l] == -1) && visit[v][l]) ? v : parents_v[n][l]; + parents_e[n][l] = ((parents_e[n][l] == -1) && visit[v][l]) + ? edge_id + : parents_e[n][l]; } } } } - for (auto const& cache: parents_v_cache) { - parents_v[cache.first.first][cache.first.second] = cache.second; - } - for (auto const& cache: paths_v_cache) { - paths_v[cache.first.first][cache.first.second] = cache.second; - } - for (auto const& cache: paths_e_cache) { - paths_e[cache.first.first][cache.first.second] = cache.second; - } - for (auto v = 0; v < v_size; v++) { - // next[v] = next[v] & ~seen[v]; + next[v] = next[v] & ~seen[v]; seen[v] = seen[v] | next[v]; change |= next[v].any(); } @@ -111,14 +81,10 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state, auto dst_data = (int64_t *)vdata_dst.data; // get lowerbound and upperbound - auto &lower_bound = args.data[4]; - auto &upper_bound = args.data[5]; - UnifiedVectorFormat vdata_lower_bound; + auto &upper = args.data[5]; UnifiedVectorFormat vdata_upper_bound; - lower_bound.ToUnifiedFormat(args.size(), vdata_lower_bound); - upper_bound.ToUnifiedFormat(args.size(), vdata_upper_bound); - auto lower_bound_data = (int64_t *)vdata_lower_bound.data; - auto upper_bound_data = (int64_t *)vdata_upper_bound.data; + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; result.SetVectorType(VectorType::FLAT_VECTOR); auto result_data = FlatVector::GetData(result); @@ -128,11 +94,10 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state, vector> seen(v_size); vector> visit1(v_size); vector> visit2(v_size); - - vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); - vector>> paths_v(v_size, std::vector>(LANE_LIMIT)); - vector>> paths_e(v_size, std::vector>(LANE_LIMIT)); - + vector> parents_v(v_size, + std::vector(LANE_LIMIT, -1)); + vector> parents_e(v_size, + std::vector(LANE_LIMIT, -1)); // maps lane to search number int16_t lane_to_num[LANE_LIMIT]; @@ -157,25 +122,15 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state, while (started_searches < args.size()) { int64_t search_num = started_searches++; int64_t src_pos = vdata_src.sel->get_index(search_num); - int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (!vdata_src.validity.RowIsValid(src_pos)) { result_validity.SetInvalid(search_num); - } else if (src_data[src_pos] == dst_data[dst_pos]) { - // unique_ptr output = - // make_uniq(LogicalType::LIST(LogicalType::BIGINT)); - // ListVector::PushBack(*output, src_data[src_pos]); - // ListVector::Append(result, ListVector::GetEntry(*output), - // ListVector::GetListSize(*output)); - // result_data[search_num].length = ListVector::GetListSize(*output); - // result_data[search_num].offset = total_len; - // total_len += result_data[search_num].length; - visit1[src_data[src_pos]][lane] = true; - lane_to_num[lane] = search_num; // active lane - active++; - break; } else { visit1[src_data[src_pos]][lane] = true; - seen[src_data[src_pos]][lane] = true; + parents_v[src_data[src_pos]][lane] = + src_data[src_pos]; // Mark source with source id + parents_e[src_data[src_pos]][lane] = + -2; // Mark the source with -2, there is no incoming edge for the + // source. lane_to_num[lane] = search_num; // active lane active++; break; @@ -184,13 +139,14 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state, } //! make passes while a lane is still active - for (int64_t iter = 1; active; iter++) { + for (int64_t iter = 1; active && iter <= upper_bound; iter++) { //! Perform one step of bfs exploration - if (!IterativeLength(v_size, v, e, edge_ids, parents_v, paths_v, paths_e, seen, + if (!IterativeLength(v_size, v, e, edge_ids, parents_v, parents_e, seen, (iter & 1) ? visit1 : visit2, (iter & 1) ? visit2 : visit1)) { break; } + int64_t finished_searches = 0; // detect lanes that finished for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { int64_t search_num = lane_to_num[lane]; @@ -198,47 +154,78 @@ static void ShortestPathFunction(DataChunk &args, ExpressionState &state, //! Check if dst for a source has been seen int64_t dst_pos = vdata_dst.sel->get_index(search_num); if (seen[dst_data[dst_pos]][lane]) { - // check if the path length is within bounds - // bound vector is either a constant or a flat vector - if (lower_bound.GetVectorType() == VectorType::CONSTANT_VECTOR ? - iter < lower_bound_data[0] : iter < lower_bound_data[dst_pos]) { - // when reach the destination too early, treat destination as null - // looks like the graph does not have that vertex - seen[dst_data[dst_pos]][lane] = false; - (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false - : visit1[dst_data[dst_pos]][lane] = false; - continue; - } else if (upper_bound.GetVectorType() == VectorType::CONSTANT_VECTOR ? - iter > upper_bound_data[0] : iter > upper_bound_data[dst_pos]) { - result_validity.SetInvalid(search_num); - } else { - vector output_vector; - auto it_v = paths_v[dst_data[dst_pos]][lane].begin(), - end_v = paths_v[dst_data[dst_pos]][lane].end(); - auto it_e = paths_e[dst_data[dst_pos]][lane].begin(), - end_e = paths_e[dst_data[dst_pos]][lane].end(); - while (it_v != end_v && it_e != end_e) { - output_vector.push_back(*it_v); - output_vector.push_back(*it_e); - it_v++; - it_e++; - } - output_vector.push_back(dst_data[dst_pos]); - auto output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); - for (auto val : output_vector) { - Value value_to_insert = val; - ListVector::PushBack(*output, value_to_insert); - } - result_data[search_num].length = ListVector::GetListSize(*output); - result_data[search_num].offset = total_len; - ListVector::Append(result, ListVector::GetEntry(*output), - ListVector::GetListSize(*output)); - total_len += result_data[search_num].length; - } - lane_to_num[lane] = -1; // mark inactive + finished_searches++; } } } + if (finished_searches == LANE_LIMIT) { + break; + } + } + //! Reconstruct the paths + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num == -1) { // empty lanes + continue; + } + + //! Searches that have stopped have found a path + int64_t src_pos = vdata_src.sel->get_index(search_num); + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (src_data[src_pos] == dst_data[dst_pos]) { // Source == destination + unique_ptr output = + make_uniq(LogicalType::LIST(LogicalType::BIGINT)); + ListVector::PushBack(*output, src_data[src_pos]); + ListVector::Append(result, ListVector::GetEntry(*output), + ListVector::GetListSize(*output)); + result_data[search_num].length = ListVector::GetListSize(*output); + result_data[search_num].offset = total_len; + total_len += result_data[search_num].length; + continue; + } + std::vector output_vector; + std::vector output_edge; + auto source_v = src_data[src_pos]; // Take the source + + auto parent_vertex = + parents_v[dst_data[dst_pos]] + [lane]; // Take the parent vertex of the destination vertex + auto parent_edge = + parents_e[dst_data[dst_pos]] + [lane]; // Take the parent edge of the destination vertex + + output_vector.push_back(dst_data[dst_pos]); // Add destination vertex + output_vector.push_back(parent_edge); + while (parent_vertex != source_v) { // Continue adding vertices until we + // have reached the source vertex + //! -1 is used to signify no parent + if (parent_vertex == -1 || + parent_vertex == parents_v[parent_vertex][lane]) { + result_validity.SetInvalid(search_num); + break; + } + output_vector.push_back(parent_vertex); + parent_edge = parents_e[parent_vertex][lane]; + parent_vertex = parents_v[parent_vertex][lane]; + output_vector.push_back(parent_edge); + } + + if (!result_validity.RowIsValid(search_num)) { + continue; + } + output_vector.push_back(source_v); + std::reverse(output_vector.begin(), output_vector.end()); + auto output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); + for (auto val : output_vector) { + Value value_to_insert = val; + ListVector::PushBack(*output, value_to_insert); + } + + result_data[search_num].length = ListVector::GetListSize(*output); + result_data[search_num].offset = total_len; + ListVector::Append(result, ListVector::GetEntry(*output), + ListVector::GetListSize(*output)); + total_len += result_data[search_num].length; } } duckpgq_state->csr_to_delete.insert(info.csr_id); diff --git a/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp b/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp new file mode 100644 index 00000000..63daba93 --- /dev/null +++ b/duckpgq/src/duckpgq/functions/scalar/shortest_path_lowerbound.cpp @@ -0,0 +1,254 @@ +#include "duckdb/common/fstream.hpp" +#include "duckdb/common/profiler.hpp" +#include "duckdb/main/client_data.hpp" +#include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckpgq/common.hpp" +#include "duckpgq/duckpgq_functions.hpp" + +#include +#include + +namespace duckdb { + +static bool IterativeLengthLowerBound(int64_t v_size, int64_t *V, vector &E, + vector &edge_ids, + vector>> &parents_v, + vector>> &paths_v, + vector>> &paths_e, + vector> &seen, + vector> &visit, + vector> &next) { + bool change = false; + map, unordered_set> parents_v_cache; + map, vector> paths_v_cache; + map, vector> paths_e_cache; + for (auto v = 0; v < v_size; v++) { + next[v] = 0; + } + //! Keep track of edge id through which the node was reached + for (auto v = 0; v < v_size; v++) { + if (visit[v].any()) { + for (auto e = V[v]; e < V[v + 1]; e++) { + auto n = E[e]; + auto edge_id = edge_ids[e]; + + for (auto lane = 0; lane < LANE_LIMIT; lane++) { + if (visit[v][lane]) { + //! If the node has not been visited, then update the parent and edge + if (seen[n][lane] == false || parents_v[v][lane].find(n) == parents_v[v][lane].end()) { + if (visit[n][lane]) { + parents_v_cache[make_pair(n, lane)] = parents_v[v][lane]; + parents_v_cache[make_pair(n, lane)].insert(v); + paths_v_cache[make_pair(n, lane)] = paths_v[v][lane]; + paths_v_cache[make_pair(n, lane)].push_back(v); + paths_e_cache[make_pair(n, lane)] = paths_e[v][lane]; + paths_e_cache[make_pair(n, lane)].push_back(edge_id); + } else { + parents_v[n][lane] = parents_v[v][lane]; + parents_v[n][lane].insert(v); + paths_v[n][lane] = paths_v[v][lane]; + paths_v[n][lane].push_back(v); + paths_e[n][lane] = paths_e[v][lane]; + paths_e[n][lane].push_back(edge_id); + } + next[n][lane] = true; + } + } + } + } + } + } + + for (auto const& cache: parents_v_cache) { + parents_v[cache.first.first][cache.first.second] = cache.second; + } + for (auto const& cache: paths_v_cache) { + paths_v[cache.first.first][cache.first.second] = cache.second; + } + for (auto const& cache: paths_e_cache) { + paths_e[cache.first.first][cache.first.second] = cache.second; + } + + for (auto v = 0; v < v_size; v++) { + seen[v] = seen[v] | next[v]; + change |= next[v].any(); + } + return change; +} + +static void ShortestPathLowerBoundFunction(DataChunk &args, ExpressionState &state, + Vector &result) { + auto &func_expr = (BoundFunctionExpression &)state.expr; + auto &info = (IterativeLengthFunctionData &)*func_expr.bind_info; + auto duckpgq_state_entry = info.context.registered_state.find("duckpgq"); + if (duckpgq_state_entry == info.context.registered_state.end()) { + //! Wondering how you can get here if the extension wasn't loaded, but + //! leaving this check in anyways + throw MissingExtensionException( + "The DuckPGQ extension has not been loaded"); + } + auto duckpgq_state = + reinterpret_cast(duckpgq_state_entry->second.get()); + + D_ASSERT(duckpgq_state->csr_list[info.csr_id]); + int32_t id = args.data[0].GetValue(0).GetValue(); + int64_t v_size = args.data[1].GetValue(0).GetValue(); + + int64_t *v = (int64_t *)duckpgq_state->csr_list[id]->v; + vector &e = duckpgq_state->csr_list[id]->e; + vector &edge_ids = duckpgq_state->csr_list[id]->edge_ids; + + auto &src = args.data[2]; + auto &target = args.data[3]; + + UnifiedVectorFormat vdata_src, vdata_dst; + src.ToUnifiedFormat(args.size(), vdata_src); + target.ToUnifiedFormat(args.size(), vdata_dst); + + auto src_data = (int64_t *)vdata_src.data; + auto dst_data = (int64_t *)vdata_dst.data; + + // get lowerbound and upperbound + auto &lower = args.data[4]; + auto &upper = args.data[5]; + UnifiedVectorFormat vdata_lower_bound; + UnifiedVectorFormat vdata_upper_bound; + lower.ToUnifiedFormat(args.size(), vdata_lower_bound); + upper.ToUnifiedFormat(args.size(), vdata_upper_bound); + auto lower_bound = ((int64_t *)vdata_lower_bound.data)[0]; + auto upper_bound = ((int64_t *)vdata_upper_bound.data)[0]; + + result.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(result); + ValidityMask &result_validity = FlatVector::Validity(result); + + // create temp SIMD arrays + vector> seen(v_size); + vector> visit1(v_size); + vector> visit2(v_size); + + vector>> parents_v(v_size, std::vector>(LANE_LIMIT)); + vector>> paths_v(v_size, std::vector>(LANE_LIMIT)); + vector>> paths_e(v_size, std::vector>(LANE_LIMIT)); + + + // maps lane to search number + int16_t lane_to_num[LANE_LIMIT]; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; // inactive + } + int64_t total_len = 0; + + idx_t started_searches = 0; + while (started_searches < args.size()) { + + // empty visit vectors + for (auto i = 0; i < v_size; i++) { + seen[i] = 0; + visit1[i] = 0; + } + + // add search jobs to free lanes + uint64_t active = 0; + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + lane_to_num[lane] = -1; + while (started_searches < args.size()) { + int64_t search_num = started_searches++; + int64_t src_pos = vdata_src.sel->get_index(search_num); + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (!vdata_src.validity.RowIsValid(src_pos)) { + result_validity.SetInvalid(search_num); + } else if (src_data[src_pos] == dst_data[dst_pos]) { + visit1[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } else { + visit1[src_data[src_pos]][lane] = true; + seen[src_data[src_pos]][lane] = true; + lane_to_num[lane] = search_num; // active lane + active++; + break; + } + } + } + + //! make passes while a lane is still active + for (int64_t iter = 1; active && iter <= upper_bound; iter++) { + //! Perform one step of bfs exploration + if (!IterativeLengthLowerBound(v_size, v, e, edge_ids, parents_v, paths_v, paths_e, seen, + (iter & 1) ? visit1 : visit2, + (iter & 1) ? visit2 : visit1)) { + break; + } + // detect lanes that finished + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + //! Check if dst for a source has been seen + int64_t dst_pos = vdata_dst.sel->get_index(search_num); + if (seen[dst_data[dst_pos]][lane]) { + // check if the path length is within bounds + // bound vector is either a constant or a flat vector + if (iter < lower_bound) { + // when reach the destination too early, treat destination as null + // looks like the graph does not have that vertex + seen[dst_data[dst_pos]][lane] = false; + (iter & 1) ? visit2[dst_data[dst_pos]][lane] = false + : visit1[dst_data[dst_pos]][lane] = false; + continue; + } else { + vector output_vector; + auto it_v = paths_v[dst_data[dst_pos]][lane].begin(), + end_v = paths_v[dst_data[dst_pos]][lane].end(); + auto it_e = paths_e[dst_data[dst_pos]][lane].begin(), + end_e = paths_e[dst_data[dst_pos]][lane].end(); + while (it_v != end_v && it_e != end_e) { + output_vector.push_back(*it_v); + output_vector.push_back(*it_e); + it_v++; + it_e++; + } + output_vector.push_back(dst_data[dst_pos]); + auto output = make_uniq(LogicalType::LIST(LogicalType::BIGINT)); + for (auto val : output_vector) { + Value value_to_insert = val; + ListVector::PushBack(*output, value_to_insert); + } + result_data[search_num].length = ListVector::GetListSize(*output); + result_data[search_num].offset = total_len; + ListVector::Append(result, ListVector::GetEntry(*output), + ListVector::GetListSize(*output)); + total_len += result_data[search_num].length; + lane_to_num[lane] = -1; // mark inactive + } + } + } + } + } + + // no changes anymore: any still active searches have no path + for (int64_t lane = 0; lane < LANE_LIMIT; lane++) { + int64_t search_num = lane_to_num[lane]; + if (search_num >= 0) { // active lane + result_validity.SetInvalid(search_num); + lane_to_num[lane] = -1; // mark inactive + } + } + } + duckpgq_state->csr_to_delete.insert(info.csr_id); +} + +CreateScalarFunctionInfo DuckPGQFunctions::GetShortestPathLowerBoundFunction() { + auto fun = ScalarFunction("shortestpath_lowerbound", + {LogicalType::INTEGER, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT, + LogicalType::BIGINT, LogicalType::BIGINT}, + LogicalType::LIST(LogicalType::BIGINT), + ShortestPathLowerBoundFunction, + IterativeLengthFunctionData::IterativeLengthBind); + return CreateScalarFunctionInfo(fun); +} + +} // namespace duckdb diff --git a/duckpgq/src/duckpgq/functions/tablefunctions/match.cpp b/duckpgq/src/duckpgq/functions/tablefunctions/match.cpp index 1c6cb2bb..f098e330 100644 --- a/duckpgq/src/duckpgq/functions/tablefunctions/match.cpp +++ b/duckpgq/src/duckpgq/functions/tablefunctions/match.cpp @@ -541,7 +541,8 @@ unique_ptr PGQMatchFunction::CreatePathFindingFunction( make_uniq(Value::INTEGER(static_cast(edge_subpath->upper)))); auto shortest_path_function = make_uniq( - "shortestpath", std::move(pathfinding_children)); + edge_subpath->lower > 1 ? "shortestpath_lowerbound": "shortestpath", + std::move(pathfinding_children)); if (!final_list) { final_list = std::move(shortest_path_function); @@ -679,7 +680,8 @@ void PGQMatchFunction::AddPathFinding( make_uniq(Value::INTEGER(static_cast(subpath->upper)))); auto reachability_function = make_uniq( - "iterativelength", std::move(pathfinding_children)); + subpath->lower > 1 ? "iterativelength_lowerbound": "iterativelength", + std::move(pathfinding_children)); auto cte_col_ref = make_uniq("temp", "__x"); diff --git a/test/sql/path-finding/complex_matching.test b/test/sql/path-finding/complex_matching.test index 25d62a15..e67934e2 100644 --- a/test/sql/path-finding/complex_matching.test +++ b/test/sql/path-finding/complex_matching.test @@ -76,7 +76,7 @@ query IIIIIII 28587302322223 [0, 1, 26, 64, 33, 78, 38] 3 [0, 26, 33, 38] [1, 64, 78] 14 24189255811081 30786325577731 [0, 1, 26, 64, 33, 79, 39] 3 [0, 26, 33, 39] [1, 64, 79] 14 24189255811081 32985348833329 [0, 1, 26, 64, 33, 80, 43] 3 [0, 26, 33, 43] [1, 64, 80] 14 24189255811081 -35184372088850 [0, 1, 26, 64, 33, 77, 36, 82, 45] 4 [0, 26, 33, 36, 45] [1, 64, 77, 82] 14 24189255811081 +35184372088850 [0, 1, 26, 63, 32, 76, 36, 82, 45] 4 [0, 26, 32, 36, 45] [1, 63, 76, 82] 14 24189255811081 28587302322204 [0, 2, 32, 75, 33, 77, 36] 3 [0, 32, 33, 36] [2, 75, 77] 14 26388279066668 28587302322223 [0, 2, 32, 75, 33, 78, 38] 3 [0, 32, 33, 38] [2, 75, 78] 14 26388279066668 30786325577731 [0, 2, 32, 75, 33, 79, 39] 3 [0, 32, 33, 39] [2, 75, 79] 14 26388279066668 @@ -366,5 +366,5 @@ query III [1, 3, 5, 27, 40] 16 30786325577740 [1, 5, 33, 80, 43] 16 32985348833329 [1, 3, 5, 22, 26, 66, 44] 16 35184372088834 -[1, 6, 36, 82, 45] 16 35184372088850 +[1, 3, 5, 28, 45] 16 35184372088850 [1, 3, 5, 23, 31, 74, 46] 16 35184372088856 diff --git a/test/sql/path-finding/shortest_path.test b/test/sql/path-finding/shortest_path.test index 7bfa70a8..3969b411 100644 --- a/test/sql/path-finding/shortest_path.test +++ b/test/sql/path-finding/shortest_path.test @@ -116,9 +116,9 @@ WITH cte1 AS ( FROM Know k JOIN student a on a.id = k.src JOIN student c on c.id = k.dst -) SELECT shortestpath(0, (select count(*) from student), a.rowid, b.rowid, 2, 3) as path, a.name as a_name, b.name as b_name +) SELECT shortestpath_lowerbound(0, (select count(*) from student), a.rowid, b.rowid, 2, 3) as path, a.name as a_name, b.name as b_name FROM student a, student b, (select count(cte1.temp) * 0 as temp from cte1) __x - WHERE __x.temp * 0 + iterativelength(0, (select count(*) from student), a.rowid, b.rowid, 2, 3) + WHERE __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from student), a.rowid, b.rowid, 2, 3) ORDER BY a.name, b.name; ---- [0, 2, 3, 3, 0] Daniel Daniel diff --git a/test/sql/path-finding/shortest_path_bound.test b/test/sql/path-finding/shortest_path_bound.test index 013f6c50..97ee93ba 100644 --- a/test/sql/path-finding/shortest_path_bound.test +++ b/test/sql/path-finding/shortest_path_bound.test @@ -53,9 +53,9 @@ WITH cte1 AS ( FROM Know k JOIN Point a on a.id = k.src JOIN Point c on c.id = k.dst -) SELECT a.id as srd_id, b.id as dst_id, iterativelength(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3) as path_length +) SELECT a.id as srd_id, b.id as dst_id, iterativelength_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3) as path_length FROM Point a, Point b, (select count(cte1.temp) * 0 as temp from cte1) __x - WHERE a.id = 0 and __x.temp * 0 + iterativelength(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3); + WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point), a.rowid, b.rowid, 2, 3); ---- 0 1 3 0 3 2 @@ -139,9 +139,9 @@ WITH cte1 AS ( FROM know2 k JOIN Point2 a on a.id = k.src JOIN Point2 c on c.id = k.dst -) SELECT a.id, b.id, iterativelength(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30) as path_length +) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30) as path_length FROM Point2 a, Point2 b, (select count(cte1.temp) * 0 as temp from cte1) __x - WHERE a.id = 0 and __x.temp * 0 + iterativelength(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30); + WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point2), a.rowid, b.rowid, 2, 30); ---- 0 2 2 @@ -192,9 +192,9 @@ WITH cte1 AS ( FROM know3 k JOIN Point3 a on a.id = k.src JOIN Point3 c on c.id = k.dst -) SELECT a.id, b.id, iterativelength(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3) as path_length +) SELECT a.id, b.id, iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3) as path_length FROM Point3 a, Point3 b, (select count(cte1.temp) * 0 as temp from cte1) __x - WHERE a.id = 0 and __x.temp * 0 + iterativelength(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3); + WHERE a.id = 0 and __x.temp * 0 + iterativelength_lowerbound(0, (select count(*) from Point3), a.rowid, b.rowid, 2, 3); ---- 0 0 2