Skip to content

Commit

Permalink
fix(interactive): fix the bug in filtering after intersection in GIE …
Browse files Browse the repository at this point in the history
…Runtime in distributed computation (#3359)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

This pr:
1) fix the bug in filtering after intersection in distributed
computation;
2) optimize the case of filtering by label only that doesn't necessary
to query the storage, in Auxilia operator.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3360

---------

Co-authored-by: Longbin Lai <[email protected]>
Co-authored-by: xiaolei.zl <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2023
1 parent 1f85ffd commit 72b5cdc
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 31 deletions.
7 changes: 6 additions & 1 deletion flex/codegen/src/hqps_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ void build_fused_edge_get_v(
CHECK(vertex_labels.size() > 0);
edge_expand_op.set_expand_opt(
physical::EdgeExpand::ExpandOpt::EdgeExpand_ExpandOpt_VERTEX);
edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value());
if (get_v_op.has_alias()) {
edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value());
} else {
edge_expand_op.mutable_alias()->set_value(-1);
}

ss << _4_SPACES
<< BuildEdgeExpandOp<LabelT>(ctx, edge_expand_op, edge_meta_data,
vertex_labels)
Expand Down
28 changes: 18 additions & 10 deletions flex/engines/hqps_db/database/adj_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ class SinglePropGetter {
using value_type = T;
static constexpr size_t prop_num = 1;
SinglePropGetter() {}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {
CHECK(column.get() != nullptr);
}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {}

inline value_type get_view(vid_t vid) const {
if (vid == NONE) {
if (vid == NONE || column == nullptr) {
return NullRecordCreator<value_type>::GetNull();
}
return column->get_view(vid);
Expand Down Expand Up @@ -149,15 +147,25 @@ class MultiPropGetter {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
}
return get_view(vid, std::make_index_sequence<sizeof...(T)>());
result_tuple_t ret;
fill_result_tuple(ret, vid);
return ret;
}

template <size_t... Is>
inline result_tuple_t get_view(vid_t vid, std::index_sequence<Is...>) const {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
template <size_t I = 0>
inline typename std::enable_if<I == sizeof...(T), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {}

template <size_t I = 0>
inline typename std::enable_if<(I < sizeof...(T)), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {
using cur_ele_t = typename std::tuple_element<I, result_tuple_t>::type;
if (std::get<I>(column) == nullptr) {
std::get<I>(ret) = NullRecordCreator<cur_ele_t>::GetNull();
} else {
std::get<I>(ret) = std::get<I>(column)->get_view(vid);
}
return std::make_tuple(std::get<Is>(column)->get_view(vid)...);
fill_result_tuple<I + 1>(ret, vid);
}

inline MultiPropGetter<T...>& operator=(const MultiPropGetter<T...>& d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest {

public abstract Traversal<Vertex, Long> get_pattern_16_test();

public abstract Traversal<Vertex, Long> get_pattern_17_test();

@Test
public void run_pattern_1_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_1_test();
Expand Down Expand Up @@ -170,6 +172,13 @@ public void run_pattern_16_test() {
Assert.assertEquals(23286L, traversal.next().longValue());
}

@Test
public void run_pattern_17_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_17_test();
this.printTraversalForm(traversal);
Assert.assertEquals(17367L, traversal.next().longValue());
}

public static class Traversals extends PatternQueryTest {

// PM1
Expand Down Expand Up @@ -356,5 +365,26 @@ public Traversal<Vertex, Long> get_pattern_16_test() {
.by("firstName")
.count();
}

@Override
public Traversal<Vertex, Long> get_pattern_17_test() {
return g.V().match(
__.as("a")
.hasLabel("PERSON")
.out("HASINTEREST")
.hasLabel("TAG")
.as("b"),
__.as("a")
.hasLabel("PERSON")
.in("HASCREATOR")
.hasLabel("COMMENT", "POST")
.as("c"),
__.as("c")
.hasLabel("COMMENT", "POST")
.out("HASTAG")
.hasLabel("TAG")
.as("b"))
.count();
}
}
}
25 changes: 18 additions & 7 deletions interactive_engine/executor/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,24 @@ impl From<(pb::EdgeExpand, pb::GetV)> for pb::path_expand::ExpandBase {
}

impl pb::QueryParams {
// is_queryable doesn't consider tables as we assume that the table info can be inferred directly from current data.
pub fn is_queryable(&self) -> bool {
!(self.predicate.is_none()
&& self.limit.is_none()
&& self.sample_ratio == 1.0
&& self.columns.is_empty()
&& !self.is_all_columns)
pub fn has_labels(&self) -> bool {
!self.tables.is_empty()
}

pub fn has_columns(&self) -> bool {
!self.columns.is_empty() || self.is_all_columns
}

pub fn has_predicates(&self) -> bool {
self.predicate.is_some()
}

pub fn has_sample(&self) -> bool {
self.sample_ratio != 1.0
}

pub fn has_limit(&self) -> bool {
self.limit.is_some()
}

pub fn is_empty(&self) -> bool {
Expand Down
8 changes: 7 additions & 1 deletion interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,13 @@ fn is_whole_graph(operator: &pb::logical_plan::Operator) -> bool {
&& scan
.params
.as_ref()
.map(|params| !params.is_queryable() && is_params_all_labels(params))
.map(|params| {
!(params.has_columns()
|| params.has_predicates()
|| params.has_sample()
|| params.has_limit())
&& is_params_all_labels(params)
})
.unwrap_or(true)
}
pb::logical_plan::operator::Opr::Root(_) => true,
Expand Down
16 changes: 11 additions & 5 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ impl AsPhysical for pb::PathExpand {
let getv = getv.unwrap();
if edge_expand.expand_opt == pb::edge_expand::ExpandOpt::Edge as i32 {
let has_getv_filter = if let Some(params) = getv.params.as_ref() {
params.is_queryable() || !params.tables.is_empty()
// In RBO, sample_ratio and limit would be fused into EdgeExpand(Opt=Edge), rather than GetV,
// thus we do not consider them here.
// TODO: Notice that we consider table here since we cannot specify vertex labels in ExpandV
params.has_predicates() || params.has_columns() || params.has_labels()
} else {
false
};
Expand Down Expand Up @@ -448,9 +451,9 @@ fn build_and_try_fuse_get_v(builder: &mut PlanBuilder, mut get_v: pb::GetV) -> I
return Err(IrError::Unsupported("Try to fuse GetV with Opt=Self into ExpandE".to_string()));
}
if let Some(params) = get_v.params.as_mut() {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
return Err(IrError::Unsupported("Try to fuse GetV with predicates into ExpandE".to_string()));
} else if !params.tables.is_empty() {
} else if params.has_labels() {
// although this doesn't need query, it cannot be fused into ExpandExpand since we cannot specify vertex labels in ExpandV
builder.get_v(get_v);
return Ok(());
Expand Down Expand Up @@ -502,7 +505,7 @@ impl AsPhysical for pb::GetV {
let mut getv = self.clone();
// If GetV(Adj) with filter, translate GetV into GetV(GetAdj) + Shuffle (if on distributed storage) + GetV(Self)
if let Some(params) = getv.params.as_mut() {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
Expand Down Expand Up @@ -1069,7 +1072,7 @@ fn add_intersect_job_builder(
// vertex parameter after the intersection
if let Some(params) = get_v.params.as_ref() {
// the case that we need to further process getV's filter.
if params.is_queryable() || !params.tables.is_empty() {
if params.has_predicates() || params.has_columns() || params.has_labels() {
get_v.opt = 4;
auxilia = Some(get_v.clone());
}
Expand All @@ -1089,6 +1092,9 @@ fn add_intersect_job_builder(
// add vertex filters
if let Some(mut auxilia) = auxilia {
auxilia.tag = Some(intersect_tag.clone());
if plan_meta.is_partition() {
builder.shuffle(Some(intersect_tag.clone()));
}
builder.get_v(auxilia);
}
Ok(())
Expand Down
12 changes: 12 additions & 0 deletions interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,16 @@ impl QueryParams {
None
}
}

pub fn has_labels(&self) -> bool {
!self.labels.is_empty()
}

pub fn has_predicates(&self) -> bool {
self.filter.is_some()
}

pub fn has_columns(&self) -> bool {
self.columns.is_some()
}
}
4 changes: 2 additions & 2 deletions interactive_engine/executor/ir/runtime/src/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ impl<P: PartitionInfo, C: ClusterInfo> IRJobAssembly<P, C> {
OpKind::Scan(scan) => {
let udf_gen = self.udf_gen.clone();
stream = stream.flat_map(move |_| {
let scan_iter = udf_gen.gen_source(scan.clone().into()).unwrap();
Ok(scan_iter)
let scan_iter = udf_gen.gen_source(scan.clone().into());
Ok(scan_iter?)
})?;
}
OpKind::Sample(sample) => {
Expand Down
7 changes: 7 additions & 0 deletions interactive_engine/executor/ir/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ impl From<GraphProxyError> for FnGenError {
}
}

impl From<FnGenError> for DynError {
fn from(e: FnGenError) -> Self {
let err: Box<dyn std::error::Error + Send + Sync> = e.into();
err
}
}

impl From<FnGenError> for BuildJobError {
fn from(e: FnGenError) -> Self {
match e {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,22 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
// e.g., for g.V().out().as("a").has("name", "marko"), we should compile as:
// g.V().out().auxilia(as("a"))... where we give alias in auxilia,
// then we set tag=None and alias="a" in auxilia
// 1. filter by labels.
if !self.query_params.labels.is_empty() && entry.label().is_some() {

// 1. If to filter by labels, and the entry itself carries label information already, directly eval it without query the store
if self.query_params.has_labels() && entry.label().is_some() {
if !self
.query_params
.labels
.contains(&entry.label().unwrap())
{
// pruning by labels
return Ok(None);
} else if !self.query_params.has_predicates() && !self.query_params.has_columns() {
// if only filter by labels, directly return the results.
return Ok(Some(input));
}
}
// 2. further fetch properties, e.g., filter by columns.
// 2. Otherwise, filter after query store, e.g., the case of filter by columns.
match entry.get_type() {
EntryType::Vertex => {
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
Expand Down Expand Up @@ -248,7 +253,7 @@ impl FilterMapFuncGen for pb::GetV {
VOpt::Start | VOpt::End | VOpt::Other => {
let mut tables_condition: Vec<LabelId> = vec![];
if let Some(params) = self.params {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
Err(FnGenError::unsupported_error(&format!("QueryParams in GetV {:?}", params)))?
} else {
tables_condition = params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl SourceOperator {
));
}
} else if let Some(pkvs) = &self.primary_key_values {
if self.query_params.labels.is_empty() {
if !self.query_params.has_labels() {
Err(FnGenError::unsupported_error(
"Empty label in `IndexScan` self.query_params.labels",
))?
Expand Down

0 comments on commit 72b5cdc

Please sign in to comment.