Skip to content

Commit

Permalink
Merge pull request #140 from cwida/139-non-unique-vertex-error
Browse files Browse the repository at this point in the history
Error when non-unique vertices are detected during CSR creation
  • Loading branch information
Dtenwolde authored Sep 3, 2024
2 parents 1e1565c + 1216fb2 commit 206e2da
Show file tree
Hide file tree
Showing 18 changed files with 302 additions and 453 deletions.
29 changes: 22 additions & 7 deletions src/core/functions/scalar/csr_creation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,19 @@ static void CreateCsrEdgeFunction(DataChunk &args, ExpressionState &state,

int64_t vertex_size = args.data[1].GetValue(0).GetValue<int64_t>();
int64_t edge_size = args.data[2].GetValue(0).GetValue<int64_t>();
int64_t edge_size_count = args.data[3].GetValue(0).GetValue<int64_t>();
if (edge_size != edge_size_count) {
duckpgq_state->csr_to_delete.insert(info.id);
throw ConstraintException("Non-unique vertices detected. Make sure all vertices are unique for path-finding queries.");
}

auto csr_entry = duckpgq_state->csr_list.find(info.id);
if (!csr_entry->second->initialized_e) {
CsrInitializeEdge(*duckpgq_state, info.id, vertex_size, edge_size);
}
if (info.weight_type == LogicalType::SQLNULL) {
TernaryExecutor::Execute<int64_t, int64_t, int64_t, int32_t>(
args.data[3], args.data[4], args.data[5], result, args.size(),
args.data[4], args.data[5], args.data[6], result, args.size(),
[&](int64_t src, int64_t dst, int64_t edge_id) {
auto pos = ++csr_entry->second->v[src + 1];
csr_entry->second->e[(int64_t)pos - 1] = dst;
Expand All @@ -152,13 +157,13 @@ static void CreateCsrEdgeFunction(DataChunk &args, ExpressionState &state,
});
return;
}
auto weight_type = args.data[6].GetType().InternalType();
auto weight_type = args.data[7].GetType().InternalType();
if (!csr_entry->second->initialized_w) {
CsrInitializeWeight(*duckpgq_state, info.id, edge_size, weight_type);
}
if (weight_type == PhysicalType::INT64) {
QuaternaryExecutor::Execute<int64_t, int64_t, int64_t, int64_t, int32_t>(
args.data[3], args.data[4], args.data[5], args.data[6], result,
args.data[4], args.data[5], args.data[6], args.data[7], result,
args.size(),
[&](int64_t src, int64_t dst, int64_t edge_id, int64_t weight) {
auto pos = ++csr_entry->second->v[src + 1];
Expand All @@ -171,7 +176,7 @@ static void CreateCsrEdgeFunction(DataChunk &args, ExpressionState &state,
}

QuaternaryExecutor::Execute<int64_t, int64_t, int64_t, double_t, int32_t>(
args.data[3], args.data[4], args.data[5], args.data[6], result,
args.data[4], args.data[5], args.data[6], args.data[7], result,
args.size(),
[&](int64_t src, int64_t dst, int64_t edge_id, double_t weight) {
auto pos = ++csr_entry->second->v[src + 1];
Expand All @@ -196,24 +201,34 @@ ScalarFunctionSet GetCSRVertexFunction() {

ScalarFunctionSet GetCSREdgeFunction() {
ScalarFunctionSet set("create_csr_edge");
/* 1. CSR ID
* 2. Vertex size
* 3. Sum of the edges (assuming all unique vertices)
* 4. Edge size (to ensure all vertices are unique this should equal point 3)
* 4. source rowid
* 5. destination rowid
* 6. edge rowid
* 7. <optional> edge weight (INT OR DOUBLE)
*/

//! No edge weight
set.AddFunction(ScalarFunction({LogicalType::INTEGER, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT},
LogicalType::INTEGER, CreateCsrEdgeFunction,
CSRFunctionData::CSREdgeBind));

//! Integer for edge weight
set.AddFunction(ScalarFunction({LogicalType::INTEGER, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT},
LogicalType::INTEGER, CreateCsrEdgeFunction,
CSRFunctionData::CSREdgeBind));

//! Double for edge weight
set.AddFunction(ScalarFunction({LogicalType::INTEGER, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::BIGINT, LogicalType::BIGINT,
LogicalType::DOUBLE},
LogicalType::INTEGER, CreateCsrEdgeFunction,
Expand Down
3 changes: 0 additions & 3 deletions src/core/functions/scalar/iterativelength2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ static void IterativeLength2Function(DataChunk &args, ExpressionState &state,

// make passes while a lane is still active
for (int64_t iter = 1; active; iter++) {
// std::cout << "Single direction iteration: " << iter <<
// std::endl;

if (!IterativeLength2(v_size, v, e, seen, (iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
Expand Down
22 changes: 0 additions & 22 deletions src/core/functions/scalar/weakly_connected_component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,6 @@ static void UpdateComponentId(int64_t node, int64_t component_id,
}
}

// Function to handle nodes that didn't finish properly
static void AssignUnfinishedLanesToComponent(Vector &result,
WeaklyConnectedComponentFunctionData &info,
UnifiedVectorFormat &vdata_src,
int64_t *src_data,
size_t v_size) {
auto result_data = FlatVector::GetData<int64_t>(result);
for (idx_t i = 0; i < v_size; i++) {
if (info.componentId[i] == -1) {
// Assign the node to its own component
info.componentId[i] = i;
}
}

for (idx_t i = 0; i < v_size; i++) {
int64_t src_pos = vdata_src.sel->get_index(i);
if (vdata_src.validity.RowIsValid(src_pos)) {
result_data[i] = info.componentId[src_data[src_pos]];
}
}
}

static void WeaklyConnectedComponentFunction(DataChunk &args,
ExpressionState &state,
Vector &result) {
Expand Down
29 changes: 2 additions & 27 deletions src/core/functions/table/match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,31 +145,6 @@ PathElement *PGQMatchFunction::GetPathElement(
throw InternalException("Unknown path reference type detected");
}

unique_ptr<SubqueryExpression> PGQMatchFunction::GetCountTable(
const shared_ptr<PropertyGraphTable> &edge_table,
const string &prev_binding) {
// SELECT count(s.id) FROM src s
auto select_count = make_uniq<SelectStatement>();
auto select_inner = make_uniq<SelectNode>();
auto ref = make_uniq<BaseTableRef>();

ref->table_name = edge_table->source_reference;
ref->alias = prev_binding;
select_inner->from_table = std::move(ref);
vector<unique_ptr<ParsedExpression>> children;
children.push_back(
make_uniq<ColumnRefExpression>(edge_table->source_pk[0], prev_binding));

auto count_function =
make_uniq<FunctionExpression>("count", std::move(children));
select_inner->select_list.push_back(std::move(count_function));
select_count->node = std::move(select_inner);
auto result = make_uniq<SubqueryExpression>();
result->subquery = std::move(select_count);
result->subquery_type = SubqueryType::SCALAR;
return result;
}

unique_ptr<SubqueryRef> PGQMatchFunction::CreateCountCTESubquery() {
//! BEGIN OF (SELECT count(cte1.temp) as temp * 0 from cte1) __x

Expand Down Expand Up @@ -388,7 +363,7 @@ unique_ptr<CommonTableExpressionInfo> PGQMatchFunction::GenerateShortestPathCTE(
vector<unique_ptr<ParsedExpression>> pathfinding_children;
pathfinding_children.push_back(std::move(csr_id));
pathfinding_children.push_back(std::move(GetCountTable(
edge_table, previous_vertex_element->variable_binding)));
edge_table->source_reference, previous_vertex_element->variable_binding, edge_table->source_pk[0])));
pathfinding_children.push_back(std::move(src_row_id));
pathfinding_children.push_back(std::move(dst_row_id));

Expand Down Expand Up @@ -628,7 +603,7 @@ unique_ptr<ParsedExpression> PGQMatchFunction::AddPathQuantifierCondition(
vector<unique_ptr<ParsedExpression>> pathfinding_children;
pathfinding_children.push_back(std::move(csr_id));
pathfinding_children.push_back(
std::move(GetCountTable(edge_table, prev_binding)));
std::move(GetCountTable(edge_table->source_reference, prev_binding, edge_table->source_pk[0])));
pathfinding_children.push_back(std::move(src_row_id));
pathfinding_children.push_back(std::move(dst_row_id));

Expand Down
Loading

0 comments on commit 206e2da

Please sign in to comment.