Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
AL-9920 Disable agg removal to prevent row count inconsistencies in q…
Browse files Browse the repository at this point in the history
…uery results for Kylin
gleonSun committed Sep 27, 2024
1 parent 688fd2a commit 3217499
Showing 1 changed file with 99 additions and 97 deletions.
196 changes: 99 additions & 97 deletions core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java
Original file line number Diff line number Diff line change
@@ -20,12 +20,10 @@
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Exchange;
@@ -992,6 +990,10 @@ public TrimResult trimFields(
Aggregate aggregate,
ImmutableBitSet fieldsUsed,
Set<RelDataTypeField> extraFields) {
// Disable agg removal to prevent row count inconsistencies in query results for Kylin
// see https://olapio.atlassian.net/browse/LT-1420
return result(aggregate, Mappings.createIdentity(aggregate.getRowType().getFieldCount()));

// Fields:
//
// | sys fields | group fields | indicator fields | agg functions |
@@ -1005,101 +1007,101 @@ public TrimResult trimFields(
//
// But group and indicator fields stay, even if they are not used.

final RelDataType rowType = aggregate.getRowType();

// Compute which input fields are used.
// 1. group fields are always used
final ImmutableBitSet.Builder inputFieldsUsed =
aggregate.getGroupSet().rebuild();
// 2. agg functions
for (AggregateCall aggCall : aggregate.getAggCallList()) {
inputFieldsUsed.addAll(aggCall.getArgList());
if (aggCall.filterArg >= 0) {
inputFieldsUsed.set(aggCall.filterArg);
}
if (aggCall.distinctKeys != null) {
inputFieldsUsed.addAll(aggCall.distinctKeys);
}
inputFieldsUsed.addAll(RelCollations.ordinals(aggCall.collation));
}

// Create input with trimmed columns.
final RelNode input = aggregate.getInput();
final Set<RelDataTypeField> inputExtraFields = Collections.emptySet();
final TrimResult trimResult =
trimChild(aggregate, input, inputFieldsUsed.build(), inputExtraFields);
final RelNode newInput = trimResult.left;
final Mapping inputMapping = trimResult.right;
// We have to return group keys and (if present) indicators.
// So, pretend that the consumer asked for them.
final int groupCount = aggregate.getGroupSet().cardinality();
fieldsUsed =
fieldsUsed.union(ImmutableBitSet.range(groupCount));

// If the input is unchanged, and we need to project all columns,
// there's nothing to do.
if (input == newInput
&& fieldsUsed.equals(ImmutableBitSet.range(rowType.getFieldCount()))) {
return result(aggregate,
Mappings.createIdentity(rowType.getFieldCount()));
}

// Which agg calls are used by our consumer?
int j = groupCount;
int usedAggCallCount = 0;
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
if (fieldsUsed.get(j++)) {
++usedAggCallCount;
}
}

// Offset due to the number of system fields having changed.
Mapping mapping =
Mappings.create(
MappingType.INVERSE_SURJECTION,
rowType.getFieldCount(),
groupCount + usedAggCallCount);

final ImmutableBitSet newGroupSet =
Mappings.apply(inputMapping, aggregate.getGroupSet());

final ImmutableList<ImmutableBitSet> newGroupSets =
ImmutableList.copyOf(
Util.transform(aggregate.getGroupSets(),
input1 -> Mappings.apply(inputMapping, input1)));

// Populate mapping of where to find the fields. System, group key and
// indicator fields first.
for (j = 0; j < groupCount; j++) {
mapping.set(j, j);
}

// Now create new agg calls, and populate mapping for them.
relBuilder.push(newInput);
final List<RelBuilder.AggCall> newAggCallList = new ArrayList<>();
j = groupCount;
for (AggregateCall aggCall : aggregate.getAggCallList()) {
if (fieldsUsed.get(j)) {
mapping.set(j, groupCount + newAggCallList.size());
newAggCallList.add(relBuilder.aggregateCall(aggCall, inputMapping));
}
++j;
}

if (newAggCallList.isEmpty() && newGroupSet.isEmpty()) {
// Add a dummy call if all the column fields have been trimmed
mapping = Mappings.create(
MappingType.INVERSE_SURJECTION,
mapping.getSourceCount(),
1);
newAggCallList.add(relBuilder.count(false, "DUMMY"));
}

final RelBuilder.GroupKey groupKey = relBuilder.groupKey(newGroupSet, newGroupSets);
relBuilder.aggregate(groupKey, newAggCallList);

final RelNode newAggregate = RelOptUtil.propagateRelHints(aggregate, relBuilder.build());
return result(newAggregate, mapping);
// final RelDataType rowType = aggregate.getRowType();
//
// // Compute which input fields are used.
// // 1. group fields are always used
// final ImmutableBitSet.Builder inputFieldsUsed =
// aggregate.getGroupSet().rebuild();
// // 2. agg functions
// for (AggregateCall aggCall : aggregate.getAggCallList()) {
// inputFieldsUsed.addAll(aggCall.getArgList());
// if (aggCall.filterArg >= 0) {
// inputFieldsUsed.set(aggCall.filterArg);
// }
// if (aggCall.distinctKeys != null) {
// inputFieldsUsed.addAll(aggCall.distinctKeys);
// }
// inputFieldsUsed.addAll(RelCollations.ordinals(aggCall.collation));
// }
//
// // Create input with trimmed columns.
// final RelNode input = aggregate.getInput();
// final Set<RelDataTypeField> inputExtraFields = Collections.emptySet();
// final TrimResult trimResult =
// trimChild(aggregate, input, inputFieldsUsed.build(), inputExtraFields);
// final RelNode newInput = trimResult.left;
// final Mapping inputMapping = trimResult.right;
// // We have to return group keys and (if present) indicators.
// // So, pretend that the consumer asked for them.
// final int groupCount = aggregate.getGroupSet().cardinality();
// fieldsUsed =
// fieldsUsed.union(ImmutableBitSet.range(groupCount));
//
// // If the input is unchanged, and we need to project all columns,
// // there's nothing to do.
// if (input == newInput
// && fieldsUsed.equals(ImmutableBitSet.range(rowType.getFieldCount()))) {
// return result(aggregate,
// Mappings.createIdentity(rowType.getFieldCount()));
// }
//
// // Which agg calls are used by our consumer?
// int j = groupCount;
// int usedAggCallCount = 0;
// for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
// if (fieldsUsed.get(j++)) {
// ++usedAggCallCount;
// }
// }
//
// // Offset due to the number of system fields having changed.
// Mapping mapping =
// Mappings.create(
// MappingType.INVERSE_SURJECTION,
// rowType.getFieldCount(),
// groupCount + usedAggCallCount);
//
// final ImmutableBitSet newGroupSet =
// Mappings.apply(inputMapping, aggregate.getGroupSet());
//
// final ImmutableList<ImmutableBitSet> newGroupSets =
// ImmutableList.copyOf(
// Util.transform(aggregate.getGroupSets(),
// input1 -> Mappings.apply(inputMapping, input1)));
//
// // Populate mapping of where to find the fields. System, group key and
// // indicator fields first.
// for (j = 0; j < groupCount; j++) {
// mapping.set(j, j);
// }
//
// // Now create new agg calls, and populate mapping for them.
// relBuilder.push(newInput);
// final List<RelBuilder.AggCall> newAggCallList = new ArrayList<>();
// j = groupCount;
// for (AggregateCall aggCall : aggregate.getAggCallList()) {
// if (fieldsUsed.get(j)) {
// mapping.set(j, groupCount + newAggCallList.size());
// newAggCallList.add(relBuilder.aggregateCall(aggCall, inputMapping));
// }
// ++j;
// }
//
// if (newAggCallList.isEmpty() && newGroupSet.isEmpty()) {
// // Add a dummy call if all the column fields have been trimmed
// mapping = Mappings.create(
// MappingType.INVERSE_SURJECTION,
// mapping.getSourceCount(),
// 1);
// newAggCallList.add(relBuilder.count(false, "DUMMY"));
// }
//
// final RelBuilder.GroupKey groupKey = relBuilder.groupKey(newGroupSet, newGroupSets);
// relBuilder.aggregate(groupKey, newAggCallList);
//
// final RelNode newAggregate = RelOptUtil.propagateRelHints(aggregate, relBuilder.build());
// return result(newAggregate, mapping);
}

/**

0 comments on commit 3217499

Please sign in to comment.