From eb3f0863655606a22556030e718daf59b445812b Mon Sep 17 00:00:00 2001 From: liulx20 <68941872+liulx20@users.noreply.github.com> Date: Thu, 8 Aug 2024 10:30:04 +0800 Subject: [PATCH] fix(interactive): Support multiple properties edge expand (#4115) Support multiple properties edge expand for adhoc queries --- .../graph_db/runtime/adhoc/expr_impl.cc | 15 ++ .../graph_db/runtime/adhoc/expr_impl.h | 50 +++-- flex/engines/graph_db/runtime/adhoc/var.cc | 7 +- .../graph_db/runtime/common/accessors.cc | 142 ++++++++++---- .../graph_db/runtime/common/accessors.h | 173 +++++++++++++++++- .../runtime/common/columns/edge_columns.cc | 19 +- .../runtime/common/columns/edge_columns.h | 24 ++- .../runtime/common/operators/edge_expand.cc | 20 +- .../runtime/common/operators/edge_expand.h | 12 +- flex/storages/rt_mutable_graph/schema.cc | 15 +- flex/storages/rt_mutable_graph/schema.h | 3 + flex/utils/property/column.cc | 50 ++++- flex/utils/property/column.h | 95 +++++++++- 13 files changed, 554 insertions(+), 71 deletions(-) diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.cc b/flex/engines/graph_db/runtime/adhoc/expr_impl.cc index 6b46396dee2f..47bef9a153e0 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.cc +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.cc @@ -119,6 +119,10 @@ RTAny LogicalExpr::eval_vertex(label_t label, vid_t v, size_t idx) const { std::string rhs(rhs_->eval_vertex(label, v, idx).as_string()); return RTAny::from_bool(std::regex_match(ret, std::regex(rhs))); + } else if (logic_ == common::Logical::OR) { + bool ret = (rhs_->eval_vertex(label, v, idx).as_bool() || + lhs_->eval_vertex(label, v, idx).as_bool()); + return RTAny::from_bool(ret); } else { LOG(FATAL) << "not support..." << static_cast(logic_); } @@ -155,6 +159,14 @@ RTAny LogicalExpr::eval_edge(const LabelTriplet& label, vid_t src, vid_t dst, bool ret = (rhs_->eval_edge(label, src, dst, data, idx).as_bool() && lhs_->eval_edge(label, src, dst, data, idx).as_bool()); return RTAny::from_bool(ret); + } else if (logic_ == common::Logical::REGEX) { + std::string ret(lhs_->eval_edge(label, src, dst, data, idx).as_string()); + std::string rhs(rhs_->eval_edge(label, src, dst, data, idx).as_string()); + return RTAny::from_bool(std::regex_match(ret, std::regex(rhs))); + } else if (logic_ == common::Logical::OR) { + bool ret = (rhs_->eval_edge(label, src, dst, data, idx).as_bool() || + lhs_->eval_edge(label, src, dst, data, idx).as_bool()); + return RTAny::from_bool(ret); } else { LOG(FATAL) << "not support..." << static_cast(logic_); } @@ -538,6 +550,9 @@ static std::unique_ptr build_expr( } else if (key->type() == RTAnyType::kI32Value) { return std::make_unique>(txn, ctx, std::move(key), rhs.const_()); + } else if (key->type() == RTAnyType::kStringValue) { + return std::make_unique>( + txn, ctx, std::move(key), rhs.const_()); } else { LOG(FATAL) << "not support"; } diff --git a/flex/engines/graph_db/runtime/adhoc/expr_impl.h b/flex/engines/graph_db/runtime/adhoc/expr_impl.h index a3d5af79bfc0..20a764de0696 100644 --- a/flex/engines/graph_db/runtime/adhoc/expr_impl.h +++ b/flex/engines/graph_db/runtime/adhoc/expr_impl.h @@ -97,15 +97,27 @@ class WithInExpr : public ExprBase { for (size_t idx = 0; idx < len; ++idx) { container_.push_back(array.i32_array().item(idx)); } + } else if constexpr (std::is_same_v) { + CHECK(array.item_case() == common::Value::kStrArray); + size_t len = array.str_array().item_size(); + for (size_t idx = 0; idx < len; ++idx) { + container_.push_back(array.str_array().item(idx)); + } } else { LOG(FATAL) << "not implemented"; } } RTAny eval_path(size_t idx) const override { - auto val = TypedConverter::to_typed(key_->eval_path(idx)); - return RTAny::from_bool(std::find(container_.begin(), container_.end(), - val) != container_.end()); + if constexpr (std::is_same_v) { + auto val = std::string(key_->eval_path(idx).as_string()); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } else { + auto val = TypedConverter::to_typed(key_->eval_path(idx)); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } } RTAny eval_path(size_t idx, int) const override { @@ -113,14 +125,18 @@ class WithInExpr : public ExprBase { if (any_val.is_null()) { return RTAny::from_bool(false); } - auto val = TypedConverter::to_typed(any_val); - return RTAny::from_bool(std::find(container_.begin(), container_.end(), - val) != container_.end()); + return eval_path(idx); } RTAny eval_vertex(label_t label, vid_t v, size_t idx) const override { - auto val = TypedConverter::to_typed(key_->eval_vertex(label, v, idx)); - return RTAny::from_bool(std::find(container_.begin(), container_.end(), - val) != container_.end()); + if constexpr (std::is_same_v) { + auto val = std::string(key_->eval_vertex(label, v, idx).as_string()); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } else { + auto val = TypedConverter::to_typed(key_->eval_vertex(label, v, idx)); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } } RTAny eval_vertex(label_t label, vid_t v, size_t idx, int) const override { @@ -128,14 +144,22 @@ class WithInExpr : public ExprBase { if (any_val.is_null()) { return RTAny::from_bool(false); } - auto val = TypedConverter::to_typed(any_val); - return RTAny::from_bool(std::find(container_.begin(), container_.end(), - val) != container_.end()); + return eval_vertex(label, v, idx); } RTAny eval_edge(const LabelTriplet& label, vid_t src, vid_t dst, const Any& data, size_t idx) const override { - LOG(FATAL) << "not implemented"; + if constexpr (std::is_same_v) { + auto val = + std::string(key_->eval_edge(label, src, dst, data, idx).as_string()); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } else { + auto val = TypedConverter::to_typed( + key_->eval_edge(label, src, dst, data, idx)); + return RTAny::from_bool(std::find(container_.begin(), container_.end(), + val) != container_.end()); + } return RTAny::from_bool(false); } RTAnyType type() const override { return RTAnyType::kBoolValue; } diff --git a/flex/engines/graph_db/runtime/adhoc/var.cc b/flex/engines/graph_db/runtime/adhoc/var.cc index fa15f61a4f41..f2fa916109a2 100644 --- a/flex/engines/graph_db/runtime/adhoc/var.cc +++ b/flex/engines/graph_db/runtime/adhoc/var.cc @@ -76,7 +76,9 @@ Var::Var(const ReadTransaction& txn, const Context& ctx, if (pb.has_property()) { auto& pt = pb.property(); if (pt.has_key()) { - getter_ = create_edge_property_path_accessor(ctx, tag, type_); + auto name = pt.key().name(); + getter_ = + create_edge_property_path_accessor(txn, name, ctx, tag, type_); } else if (pt.has_label()) { getter_ = create_edge_label_path_accessor(ctx, tag); } else { @@ -125,7 +127,8 @@ Var::Var(const ReadTransaction& txn, const Context& ctx, if (pb.has_property()) { auto& pt = pb.property(); if (pt.has_key()) { - getter_ = create_edge_property_edge_accessor(type_); + auto name = pt.key().name(); + getter_ = create_edge_property_edge_accessor(txn, name, type_); } else { LOG(FATAL) << "not support"; } diff --git a/flex/engines/graph_db/runtime/common/accessors.cc b/flex/engines/graph_db/runtime/common/accessors.cc index de6f19986a6f..55712fc1bb72 100644 --- a/flex/engines/graph_db/runtime/common/accessors.cc +++ b/flex/engines/graph_db/runtime/common/accessors.cc @@ -104,23 +104,68 @@ std::shared_ptr create_vertex_property_vertex_accessor( } std::shared_ptr create_edge_property_path_accessor( - const Context& ctx, int tag, RTAnyType type) { - switch (type.type_enum_) { - case RTAnyType::RTAnyTypeImpl::kI64Value: - return std::make_shared>(ctx, tag); - case RTAnyType::RTAnyTypeImpl::kI32Value: - return std::make_shared>(ctx, tag); - case RTAnyType::RTAnyTypeImpl::kU64Value: - return std::make_shared>(ctx, tag); - case RTAnyType::RTAnyTypeImpl::kStringValue: - return std::make_shared>(ctx, - tag); - case RTAnyType::RTAnyTypeImpl::kDate32: - return std::make_shared>(ctx, tag); - case RTAnyType::RTAnyTypeImpl::kF64Value: - return std::make_shared>(ctx, tag); - default: - LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); + const ReadTransaction& txn, const std::string& name, const Context& ctx, + int tag, RTAnyType type) { + auto col = std::dynamic_pointer_cast(ctx.get(tag)); + const auto& labels = col->get_labels(); + bool multip_properties = false; + if (txn.schema().has_multi_props_edge()) { + for (auto label : labels) { + auto& properties = txn.schema().get_edge_properties( + label.src_label, label.dst_label, label.edge_label); + if (properties.size() > 1) { + multip_properties = true; + break; + } + } + } + if (multip_properties) { + switch (type.type_enum_) { + case RTAnyType::RTAnyTypeImpl::kI64Value: + return std::make_shared>( + txn, name, ctx, tag); + case RTAnyType::RTAnyTypeImpl::kI32Value: + return std::make_shared>( + txn, name, ctx, tag); + case RTAnyType::RTAnyTypeImpl::kU64Value: + return std::make_shared>( + txn, name, ctx, tag); + case RTAnyType::RTAnyTypeImpl::kStringValue: + return std::make_shared< + MultiPropsEdgePropertyPathAccessor>(txn, name, ctx, + tag); + case RTAnyType::RTAnyTypeImpl::kDate32: + return std::make_shared>( + txn, name, ctx, tag); + case RTAnyType::RTAnyTypeImpl::kF64Value: + return std::make_shared>( + txn, name, ctx, tag); + default: + LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); + } + } else { + switch (type.type_enum_) { + case RTAnyType::RTAnyTypeImpl::kI64Value: + return std::make_shared>(txn, name, ctx, + tag); + case RTAnyType::RTAnyTypeImpl::kI32Value: + return std::make_shared>(txn, name, ctx, + tag); + case RTAnyType::RTAnyTypeImpl::kU64Value: + return std::make_shared>(txn, name, + ctx, tag); + case RTAnyType::RTAnyTypeImpl::kStringValue: + return std::make_shared>( + txn, name, ctx, tag); + case RTAnyType::RTAnyTypeImpl::kDate32: + return std::make_shared>(txn, name, ctx, + tag); + case RTAnyType::RTAnyTypeImpl::kF64Value: + return std::make_shared>(txn, name, ctx, + tag); + default: + LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); + } } return nullptr; } @@ -130,22 +175,53 @@ std::shared_ptr create_edge_label_path_accessor(const Context& ctx, return std::make_shared(ctx, tag); } -std::shared_ptr create_edge_property_edge_accessor(RTAnyType type) { - switch (type.type_enum_) { - case RTAnyType::RTAnyTypeImpl::kI64Value: - return std::make_shared>(); - case RTAnyType::RTAnyTypeImpl::kI32Value: - return std::make_shared>(); - case RTAnyType::RTAnyTypeImpl::kU64Value: - return std::make_shared>(); - case RTAnyType::RTAnyTypeImpl::kStringValue: - return std::make_shared>(); - case RTAnyType::RTAnyTypeImpl::kDate32: - return std::make_shared>(); - case RTAnyType::RTAnyTypeImpl::kF64Value: - return std::make_shared>(); - default: - LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); +std::shared_ptr create_edge_property_edge_accessor( + const ReadTransaction& txn, const std::string& prop_name, RTAnyType type) { + bool multip_properties = txn.schema().has_multi_props_edge(); + + if (multip_properties) { + switch (type.type_enum_) { + case RTAnyType::RTAnyTypeImpl::kI64Value: + return std::make_shared>( + txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kI32Value: + return std::make_shared>( + txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kU64Value: + return std::make_shared>( + txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kStringValue: + return std::make_shared< + MultiPropsEdgePropertyEdgeAccessor>(txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kDate32: + return std::make_shared>( + txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kF64Value: + return std::make_shared>( + txn, prop_name); + default: + LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); + } + } else { + switch (type.type_enum_) { + case RTAnyType::RTAnyTypeImpl::kI64Value: + return std::make_shared>(txn, + prop_name); + case RTAnyType::RTAnyTypeImpl::kI32Value: + return std::make_shared>(txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kU64Value: + return std::make_shared>(txn, + prop_name); + case RTAnyType::RTAnyTypeImpl::kStringValue: + return std::make_shared>( + txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kDate32: + return std::make_shared>(txn, prop_name); + case RTAnyType::RTAnyTypeImpl::kF64Value: + return std::make_shared>(txn, prop_name); + default: + LOG(FATAL) << "not implemented - " << static_cast(type.type_enum_); + } } return nullptr; } diff --git a/flex/engines/graph_db/runtime/common/accessors.h b/flex/engines/graph_db/runtime/common/accessors.h index 31b6faaf9f75..3d8f24128339 100644 --- a/flex/engines/graph_db/runtime/common/accessors.h +++ b/flex/engines/graph_db/runtime/common/accessors.h @@ -395,11 +395,14 @@ class EdgeIdPathAccessor : public IAccessor { private: const IEdgeColumn& edge_col_; }; + template class EdgePropertyPathAccessor : public IAccessor { public: using elem_t = T; - EdgePropertyPathAccessor(const Context& ctx, int tag) + EdgePropertyPathAccessor(const ReadTransaction& txn, + const std::string& prop_name, const Context& ctx, + int tag) : col_(*std::dynamic_pointer_cast(ctx.get(tag))) {} RTAny eval_path(size_t idx) const override { @@ -431,6 +434,94 @@ class EdgePropertyPathAccessor : public IAccessor { const IEdgeColumn& col_; }; +template +class MultiPropsEdgePropertyPathAccessor : public IAccessor { + public: + using elem_t = T; + MultiPropsEdgePropertyPathAccessor(const ReadTransaction& txn, + const std::string& prop_name, + const Context& ctx, int tag) + : col_(*std::dynamic_pointer_cast(ctx.get(tag))) { + const auto& labels = col_.get_labels(); + vertex_label_num_ = txn.schema().vertex_label_num(); + edge_label_num_ = txn.schema().edge_label_num(); + prop_index_.resize( + 2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_, + std::numeric_limits::max()); + for (auto& label : labels) { + size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ + + label.dst_label * edge_label_num_ + label.edge_label; + const auto& names = txn.schema().get_edge_property_names( + label.src_label, label.dst_label, label.edge_label); + for (size_t i = 0; i < names.size(); ++i) { + if (names[i] == prop_name) { + prop_index_[idx] = i; + break; + } + } + } + } + + RTAny eval_path(size_t idx) const override { + const auto& e = col_.get_edge(idx); + auto val = std::get<3>(e); + auto id = get_index(std::get<0>(e)); + if (std::get<3>(e).type != PropertyType::RecordView()) { + CHECK(id == 0); + return RTAny(val); + } else { + auto rv = val.AsRecordView(); + CHECK(id != std::numeric_limits::max()); + return RTAny(rv[id]); + } + } + + elem_t typed_eval_path(size_t idx) const { + const auto& e = col_.get_edge(idx); + auto val = std::get<3>(e); + auto id = get_index(std::get<0>(e)); + if (std::get<3>(e).type != PropertyType::RecordView()) { + CHECK(id == 0); + elem_t ret; + ConvertAny::to(val, ret); + return ret; + + } else { + auto rv = val.AsRecordView(); + CHECK(id != std::numeric_limits::max()); + auto tmp = rv[id]; + elem_t ret; + ConvertAny::to(tmp, ret); + return ret; + } + } + + bool is_optional() const override { return col_.is_optional(); } + + size_t get_index(const LabelTriplet& label) const { + size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ + + label.dst_label * edge_label_num_ + label.edge_label; + return prop_index_[idx]; + } + + RTAny eval_path(size_t idx, int) const override { + if (!col_.has_value(idx)) { + return RTAny(RTAnyType::kNull); + } + return eval_path(idx); + } + + std::shared_ptr builder() const override { + return col_.builder(); + } + + private: + const IEdgeColumn& col_; + std::vector prop_index_; + size_t vertex_label_num_; + size_t edge_label_num_; +}; + class EdgeLabelPathAccessor : public IAccessor { public: using elem_t = int32_t; @@ -459,7 +550,8 @@ template class EdgePropertyEdgeAccessor : public IAccessor { public: using elem_t = T; - EdgePropertyEdgeAccessor() {} + EdgePropertyEdgeAccessor(const ReadTransaction& txn, + const std::string& name) {} elem_t typed_eval_edge(const LabelTriplet& label, vid_t src, vid_t dst, const Any& data, size_t idx) const { @@ -479,6 +571,79 @@ class EdgePropertyEdgeAccessor : public IAccessor { } }; +template +class MultiPropsEdgePropertyEdgeAccessor : public IAccessor { + public: + using elem_t = T; + MultiPropsEdgePropertyEdgeAccessor(const ReadTransaction& txn, + const std::string& name) { + edge_label_num_ = txn.schema().edge_label_num(); + vertex_label_num_ = txn.schema().vertex_label_num(); + indexs.resize(2 * vertex_label_num_ * vertex_label_num_ * edge_label_num_, + std::numeric_limits::max()); + for (label_t src_label = 0; src_label < vertex_label_num_; ++src_label) { + auto src = txn.schema().get_vertex_label_name(src_label); + for (label_t dst_label = 0; dst_label < vertex_label_num_; ++dst_label) { + auto dst = txn.schema().get_vertex_label_name(dst_label); + for (label_t edge_label = 0; edge_label < edge_label_num_; + ++edge_label) { + auto edge = txn.schema().get_edge_label_name(edge_label); + if (!txn.schema().exist(src, dst, edge)) { + continue; + } + size_t idx = src_label * vertex_label_num_ * edge_label_num_ + + dst_label * edge_label_num_ + edge_label; + const std::vector& names = + txn.schema().get_edge_property_names(src_label, dst_label, + edge_label); + for (size_t i = 0; i < names.size(); ++i) { + if (names[i] == name) { + indexs[idx] = i; + break; + } + } + } + } + } + } + + elem_t typed_eval_edge(const LabelTriplet& label, vid_t src, vid_t dst, + const Any& data, size_t idx) const { + T ret; + if (data.type != PropertyType::RecordView()) { + CHECK(get_index(label) == 0); + ConvertAny::to(data, ret); + } else { + auto id = get_index(label); + CHECK(id != std::numeric_limits::max()); + auto view = data.AsRecordView(); + ConvertAny::to(view[id], ret); + } + return ret; + } + + RTAny eval_path(size_t idx) const override { + LOG(FATAL) << "not supposed to reach here..."; + return RTAny(); + } + + RTAny eval_edge(const LabelTriplet& label, vid_t src, vid_t dst, + const Any& data, size_t idx) const override { + return RTAny(typed_eval_edge(label, src, dst, data, idx)); + } + + size_t get_index(const LabelTriplet& label) const { + size_t idx = label.src_label * vertex_label_num_ * edge_label_num_ + + label.dst_label * edge_label_num_ + label.edge_label; + return indexs[idx]; + } + + private: + std::vector indexs; + size_t vertex_label_num_; + size_t edge_label_num_; +}; + template class ParamAccessor : public IAccessor { public: @@ -591,12 +756,14 @@ std::shared_ptr create_vertex_label_path_accessor(const Context& ctx, int tag); std::shared_ptr create_edge_property_path_accessor( + const ReadTransaction& txn, const std::string& prop_name, const Context& ctx, int tag, RTAnyType type); std::shared_ptr create_edge_label_path_accessor(const Context& ctx, int tag); -std::shared_ptr create_edge_property_edge_accessor(RTAnyType type); +std::shared_ptr create_edge_property_edge_accessor( + const ReadTransaction& txn, const std::string& prop_name, RTAnyType type); } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/columns/edge_columns.cc b/flex/engines/graph_db/runtime/common/columns/edge_columns.cc index f9c0a3e1d396..35d0acce0e84 100644 --- a/flex/engines/graph_db/runtime/common/columns/edge_columns.cc +++ b/flex/engines/graph_db/runtime/common/columns/edge_columns.cc @@ -20,7 +20,12 @@ namespace gs { namespace runtime { std::shared_ptr SDSLEdgeColumn::dup() const { - SDSLEdgeColumnBuilder builder(dir_, label_, prop_type_); + std::vector sub_types; + if (prop_type_ == PropertyType::kRecordView) { + sub_types = std::dynamic_pointer_cast>(prop_col_) + ->sub_types(); + } + SDSLEdgeColumnBuilder builder(dir_, label_, prop_type_, sub_types); builder.reserve(edges_.size()); for (size_t i = 0; i < edges_.size(); ++i) { auto e = get_edge(i); @@ -31,7 +36,12 @@ std::shared_ptr SDSLEdgeColumn::dup() const { std::shared_ptr SDSLEdgeColumn::shuffle( const std::vector& offsets) const { - SDSLEdgeColumnBuilder builder(dir_, label_, prop_type_); + std::vector sub_types; + if (prop_type_ == PropertyType::kRecordView) { + sub_types = std::dynamic_pointer_cast>(prop_col_) + ->sub_types(); + } + SDSLEdgeColumnBuilder builder(dir_, label_, prop_type_, sub_types); size_t new_row_num = offsets.size(); builder.reserve(new_row_num); @@ -56,8 +66,11 @@ std::shared_ptr SDSLEdgeColumn::shuffle( } std::shared_ptr SDSLEdgeColumnBuilder::finish() { - auto ret = std::make_shared(dir_, label_, prop_type_); + auto ret = + std::make_shared(dir_, label_, prop_type_, sub_types_); ret->edges_.swap(edges_); + // shrink to fit + prop_col_->resize(edges_.size()); ret->prop_col_ = prop_col_; return ret; } diff --git a/flex/engines/graph_db/runtime/common/columns/edge_columns.h b/flex/engines/graph_db/runtime/common/columns/edge_columns.h index cc58291ab636..474ffaf9d7b9 100644 --- a/flex/engines/graph_db/runtime/common/columns/edge_columns.h +++ b/flex/engines/graph_db/runtime/common/columns/edge_columns.h @@ -50,11 +50,12 @@ class OptionalSDSLEdgeColumnBuilder; class SDSLEdgeColumn : public IEdgeColumn { public: SDSLEdgeColumn(Direction dir, const LabelTriplet& label, - PropertyType prop_type) + PropertyType prop_type, + const std::vector& sub_types = {}) : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { + prop_col_(CreateColumn(prop_type, StorageStrategy::kMem, sub_types)) { prop_col_->open_in_memory(""); } @@ -633,11 +634,14 @@ class BDMLEdgeColumn : public IEdgeColumn { class SDSLEdgeColumnBuilder : public IContextColumnBuilder { public: SDSLEdgeColumnBuilder(Direction dir, const LabelTriplet& label, - PropertyType prop_type) + PropertyType prop_type, + const std::vector& sub_types = {}) : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { + prop_col_(CreateColumn(prop_type, StorageStrategy::kMem, sub_types)), + sub_types_(sub_types), + cap_(0) { prop_col_->open_in_memory(""); } ~SDSLEdgeColumnBuilder() = default; @@ -649,8 +653,16 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder { } void push_back_opt(vid_t src, vid_t dst, const Any& data) { edges_.emplace_back(src, dst); + size_t len = edges_.size(); - prop_col_->resize(len); + + if (cap_ == 0) { + prop_col_->resize(len); + cap_ = len; + } else if (len >= cap_) { + prop_col_->resize(len * 2); + cap_ = len * 2; + } prop_col_->set_any(len - 1, data); } void push_back_endpoints(vid_t src, vid_t dst) { @@ -666,6 +678,8 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder { std::vector> edges_; PropertyType prop_type_; std::shared_ptr prop_col_; + std::vector sub_types_; + size_t cap_; }; class BDSLEdgeColumnBuilder : public IContextColumnBuilder { diff --git a/flex/engines/graph_db/runtime/common/operators/edge_expand.cc b/flex/engines/graph_db/runtime/common/operators/edge_expand.cc index 3179069ca79f..b38da41e2570 100644 --- a/flex/engines/graph_db/runtime/common/operators/edge_expand.cc +++ b/flex/engines/graph_db/runtime/common/operators/edge_expand.cc @@ -63,11 +63,13 @@ Context EdgeExpand::expand_edge_without_predicate( PropertyType pt = PropertyType::kEmpty; if (props.size() > 1) { pt = PropertyType::kRecordView; + } else if (!props.empty()) { pt = props[0]; } - SDSLEdgeColumnBuilder builder(Direction::kIn, params.labels[0], pt); + SDSLEdgeColumnBuilder builder(Direction::kIn, params.labels[0], pt, + props); label_t dst_label = params.labels[0].dst_label; foreach_vertex(input_vertex_list, @@ -99,11 +101,16 @@ Context EdgeExpand::expand_edge_without_predicate( params.labels[0].src_label, params.labels[0].dst_label, params.labels[0].edge_label); PropertyType pt = PropertyType::kEmpty; + if (!props.empty()) { pt = props[0]; } + if (props.size() > 1) { + pt = PropertyType::kRecordView; + } - SDSLEdgeColumnBuilder builder(Direction::kOut, params.labels[0], pt); + SDSLEdgeColumnBuilder builder(Direction::kOut, params.labels[0], pt, + props); label_t src_label = params.labels[0].src_label; foreach_vertex(input_vertex_list, [&](size_t index, label_t label, vid_t v) { @@ -170,6 +177,7 @@ Context EdgeExpand::expand_edge_without_predicate( auto labels = get_expand_label_set(txn, label_set, params.labels, params.dir); std::vector> label_props; + std::vector> props_vec; for (auto& triplet : labels) { auto& props = txn.schema().get_edge_properties( triplet.src_label, triplet.dst_label, triplet.edge_label); @@ -177,6 +185,10 @@ Context EdgeExpand::expand_edge_without_predicate( if (!props.empty()) { pt = props[0]; } + if (props.size() > 1) { + pt = PropertyType::kRecordView; + } + props_vec.emplace_back(props); label_props.emplace_back(triplet, pt); } if (params.dir == Direction::kOut || params.dir == Direction::kIn) { @@ -186,7 +198,7 @@ Context EdgeExpand::expand_edge_without_predicate( if (params.dir == Direction::kOut) { auto& triplet = labels[0]; SDSLEdgeColumnBuilder builder(Direction::kOut, triplet, - label_props[0].second); + label_props[0].second, props_vec[0]); foreach_vertex( input_vertex_list, [&](size_t index, label_t label, vid_t v) { if (label == triplet.src_label) { @@ -206,7 +218,7 @@ Context EdgeExpand::expand_edge_without_predicate( } else if (params.dir == Direction::kIn) { auto& triplet = labels[0]; SDSLEdgeColumnBuilder builder(Direction::kIn, triplet, - label_props[0].second); + label_props[0].second, props_vec[0]); foreach_vertex( input_vertex_list, [&](size_t index, label_t label, vid_t v) { if (label == triplet.dst_label) { diff --git a/flex/engines/graph_db/runtime/common/operators/edge_expand.h b/flex/engines/graph_db/runtime/common/operators/edge_expand.h index 90184a6ac50f..ed1e8bb7feeb 100644 --- a/flex/engines/graph_db/runtime/common/operators/edge_expand.h +++ b/flex/engines/graph_db/runtime/common/operators/edge_expand.h @@ -56,8 +56,12 @@ class EdgeExpand { if (!props.empty()) { pt = props[0]; } + if (props.size() > 1) { + pt = PropertyType::kRecordView; + } - SDSLEdgeColumnBuilder builder(Direction::kIn, params.labels[0], pt); + SDSLEdgeColumnBuilder builder(Direction::kIn, params.labels[0], pt, + props); foreach_vertex(input_vertex_list, [&](size_t index, label_t label, vid_t v) { @@ -91,8 +95,12 @@ class EdgeExpand { if (!props.empty()) { pt = props[0]; } + if (props.size() > 1) { + pt = PropertyType::kRecordView; + } - SDSLEdgeColumnBuilder builder(Direction::kOut, params.labels[0], pt); + SDSLEdgeColumnBuilder builder(Direction::kOut, params.labels[0], pt, + props); foreach_vertex(input_vertex_list, [&](size_t index, label_t label, vid_t v) { diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 4326858a4836..35c88020167f 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -24,7 +24,7 @@ namespace gs { throw std::runtime_error(msg); \ } -Schema::Schema() = default; +Schema::Schema() : has_multi_props_edge_(false) {}; Schema::~Schema() = default; void Schema::Clear() { @@ -44,6 +44,7 @@ void Schema::Clear() { max_vnum_.clear(); plugin_name_to_path_and_id_.clear(); plugin_dir_.clear(); + has_multi_props_edge_ = false; } void Schema::add_vertex_label( @@ -79,6 +80,9 @@ void Schema::add_edge_label(const std::string& src_label, uint32_t label_id = generate_edge_label(src_label_id, dst_label_id, edge_label_id); eproperties_[label_id] = properties; + if (properties.size() > 1) { + has_multi_props_edge_ = true; + } oe_strategy_[label_id] = oe; ie_strategy_[label_id] = ie; oe_mutability_[label_id] = oe_mutable; @@ -393,6 +397,13 @@ void Schema::Deserialize(std::unique_ptr& reader) { eproperties_ >> eprop_names_ >> ie_strategy_ >> oe_strategy_ >> ie_mutability_ >> oe_mutability_ >> sort_on_compactions_ >> max_vnum_ >> v_descriptions_ >> e_descriptions_ >> description_ >> version_; + has_multi_props_edge_ = false; + for (auto& eprops : eproperties_) { + if (eprops.second.size() > 1) { + has_multi_props_edge_ = true; + break; + } + } } label_t Schema::vertex_label_to_index(const std::string& label) { @@ -1316,6 +1327,8 @@ void Schema::SetDescription(const std::string& description) { void Schema::SetVersion(const std::string& version) { version_ = version; } std::string Schema::GetVersion() const { return version_; } +bool Schema::has_multi_props_edge() const { return has_multi_props_edge_; } + // check whether prop in vprop_names, or is the primary key bool Schema::vertex_has_property(const std::string& label, const std::string& prop) const { diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index f2ce4107e0d8..bca189dd603a 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -226,6 +226,8 @@ class Schema { std::string GetVersion() const; + bool has_multi_props_edge() const; + private: label_t vertex_label_to_index(const std::string& label); @@ -257,6 +259,7 @@ class Schema { std::string plugin_dir_; std::string description_; std::string version_; + bool has_multi_props_edge_; }; } // namespace gs diff --git a/flex/utils/property/column.cc b/flex/utils/property/column.cc index 85a1878c9a6b..a2581be4caa7 100644 --- a/flex/utils/property/column.cc +++ b/flex/utils/property/column.cc @@ -15,6 +15,7 @@ #include "flex/utils/property/column.h" #include "flex/utils/id_indexer.h" +#include "flex/utils/property/table.h" #include "flex/utils/property/types.h" #include "grape/serialization/out_archive.h" @@ -108,9 +109,11 @@ using BoolEmptyColumn = TypedEmptyColumn; using FloatEmptyColumn = TypedEmptyColumn; using DoubleEmptyColumn = TypedEmptyColumn; using StringEmptyColumn = TypedEmptyColumn; +using RecordViewEmptyColumn = TypedEmptyColumn; -std::shared_ptr CreateColumn(PropertyType type, - StorageStrategy strategy) { +std::shared_ptr CreateColumn( + PropertyType type, StorageStrategy strategy, + const std::vector& sub_types) { if (strategy == StorageStrategy::kNone) { if (type == PropertyType::kBool) { return std::make_shared(); @@ -171,6 +174,8 @@ std::shared_ptr CreateColumn(PropertyType type, } else if (type.type_enum == impl::PropertyTypeImpl::kVarChar) { return std::make_shared( strategy, type.additional_type_info.max_length); + } else if (type.type_enum == impl::PropertyTypeImpl::kRecordView) { + return std::make_shared(sub_types); } else { LOG(FATAL) << "unexpected type to create column, " << static_cast(type.type_enum); @@ -225,4 +230,45 @@ std::shared_ptr CreateRefColumn( } } +void TypedColumn::open_in_memory(const std::string& name) { + table_ = std::make_shared(); + std::vector col_names; + for (size_t i = 0; i < types_.size(); ++i) { + col_names.emplace_back("col_" + std::to_string(i)); + } + table_->open_in_memory(name, "", col_names, types_, {}); +} + +size_t TypedColumn::size() const { return table_->row_num(); } + +void TypedColumn::resize(size_t size) { table_->resize(size); } + +void TypedColumn::close() { table_->close(); } + +void TypedColumn::set_any(size_t index, const Any& value) { + auto rv = value.AsRecordView(); + set_value(index, rv); +} + +void TypedColumn::set_value(size_t index, const RecordView& val) { + std::vector vec; + auto& cols = table_->columns(); + for (size_t i = 0; i < val.size(); ++i) { + if (cols[i]->type() == PropertyType::kStringView) { + (dynamic_cast*>(cols[i].get())) + ->set_value_with_check(index, val[i].AsStringView()); + } else { + cols[i]->set_any(index, val[i]); + } + } +} + +Any TypedColumn::get(size_t index) const { + return Any(RecordView(index, table_.get())); +} + +RecordView TypedColumn::get_view(size_t index) const { + return RecordView(index, table_.get()); +} + } // namespace gs diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index f694a1b05406..a556857e77cc 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -227,6 +227,71 @@ class TypedColumn : public ColumnBase { StorageStrategy strategy_; }; +template <> +class TypedColumn : public ColumnBase { + public: + TypedColumn(const std::vector& types) : types_(types) { + if (types.size() == 0) { + LOG(FATAL) << "RecordView column must have sub types."; + } + } + + ~TypedColumn() { close(); } + + void open(const std::string& name, const std::string& snapshot_dir, + const std::string& work_dir) override { + LOG(FATAL) << "RecordView column does not support open."; + } + + void open_in_memory(const std::string& name) override; + + void open_with_hugepages(const std::string& name, bool force) override { + LOG(FATAL) << "RecordView column does not support open with hugepages."; + } + + void touch(const std::string& filename) override { + LOG(FATAL) << "RecordView column does not support touch."; + } + + void dump(const std::string& filename) override { + LOG(FATAL) << "RecordView column does not support dump."; + } + + void copy_to_tmp(const std::string& cur_path, + const std::string& tmp_path) override { + LOG(FATAL) << "RecordView column does not support copy_to_tmp."; + } + void close() override; + + size_t size() const override; + void resize(size_t size) override; + + PropertyType type() const override { return PropertyType::kRecordView; } + + void set_any(size_t index, const Any& value) override; + + void set_value(size_t index, const RecordView& val); + + RecordView get_view(size_t index) const; + + Any get(size_t index) const override; + + void ingest(uint32_t index, grape::OutArchive& arc) override { + LOG(FATAL) << "RecordView column does not support ingest."; + } + + StorageStrategy storage_strategy() const override { + LOG(ERROR) << "RecordView column does not have storage strategy."; + return StorageStrategy::kMem; + } + + std::vector sub_types() const { return types_; } + + private: + std::vector types_; + std::shared_ptr
table_; +}; + using BoolColumn = TypedColumn; using IntColumn = TypedColumn; using UIntColumn = TypedColumn; @@ -236,6 +301,7 @@ using DateColumn = TypedColumn; using DayColumn = TypedColumn; using DoubleColumn = TypedColumn; using FloatColumn = TypedColumn; +using RecordViewColumn = TypedColumn; template <> class TypedColumn : public ColumnBase { @@ -414,9 +480,12 @@ class TypedColumn : public ColumnBase { size_t basic_avg_width = (basic_buffer_.data_size() + basic_buffer_.size() - 1) / basic_buffer_.size(); - extra_buffer_.resize(extra_size_, extra_size_ * basic_avg_width); + // extra_size_ * basic_avg_width may be smaller than pos_.load() + extra_buffer_.resize( + extra_size_, std::max(extra_size_ * basic_avg_width, pos_.load())); } else { - extra_buffer_.resize(extra_size_, extra_size_ * width_); + extra_buffer_.resize(extra_size_, + std::max(extra_size_ * width_, pos_.load())); } } // resize `data` of basic_buffer @@ -445,6 +514,25 @@ class TypedColumn : public ColumnBase { set_value(idx, value.AsStringView()); } + // make sure there is enough space for the value + void set_value_with_check(size_t idx, const std::string_view& value) { + if (idx >= basic_size_ && idx < basic_size_ + extra_size_) { + size_t offset = pos_.fetch_add(value.size()); + if (pos_.load() > extra_buffer_.data_size()) { + extra_buffer_.resize(extra_buffer_.size(), pos_.load()); + } + extra_buffer_.set(idx - basic_size_, offset, value); + } else if (idx < basic_size_) { + size_t offset = basic_pos_.fetch_add(value.size()); + if (basic_pos_.load() > basic_buffer_.data_size()) { + basic_buffer_.resize(basic_buffer_.size(), basic_pos_.load()); + } + basic_buffer_.set(idx, offset, value); + } else { + LOG(FATAL) << "Index out of range"; + } + } + std::string_view get_view(size_t idx) const { return idx < basic_size_ ? basic_buffer_.get(idx) : extra_buffer_.get(idx - basic_size_); @@ -618,7 +706,8 @@ void StringMapColumn::set_value(size_t idx, using DefaultStringMapColumn = StringMapColumn; std::shared_ptr CreateColumn( - PropertyType type, StorageStrategy strategy = StorageStrategy::kMem); + PropertyType type, StorageStrategy strategy = StorageStrategy::kMem, + const std::vector& sub_types = {}); #ifdef USE_PTHASH template