Skip to content

Commit

Permalink
AL-9920 Disable agg removal to prevent row count inconsistencies in q…
Browse files Browse the repository at this point in the history
…uery results for Kylin
  • Loading branch information
gleonSun committed Sep 29, 2024
1 parent 688fd2a commit 4342732
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 98 deletions.
196 changes: 99 additions & 97 deletions core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Exchange;
Expand Down Expand Up @@ -992,6 +990,10 @@ public TrimResult trimFields(
Aggregate aggregate,
ImmutableBitSet fieldsUsed,
Set<RelDataTypeField> extraFields) {
// Disable agg removal to prevent row count inconsistencies in query results for Kylin
// see https://olapio.atlassian.net/browse/LT-1420
return result(aggregate, Mappings.createIdentity(aggregate.getRowType().getFieldCount()));

// Fields:
//
// | sys fields | group fields | indicator fields | agg functions |
Expand All @@ -1005,101 +1007,101 @@ public TrimResult trimFields(
//
// But group and indicator fields stay, even if they are not used.

final RelDataType rowType = aggregate.getRowType();

// Compute which input fields are used.
// 1. group fields are always used
final ImmutableBitSet.Builder inputFieldsUsed =
aggregate.getGroupSet().rebuild();
// 2. agg functions
for (AggregateCall aggCall : aggregate.getAggCallList()) {
inputFieldsUsed.addAll(aggCall.getArgList());
if (aggCall.filterArg >= 0) {
inputFieldsUsed.set(aggCall.filterArg);
}
if (aggCall.distinctKeys != null) {
inputFieldsUsed.addAll(aggCall.distinctKeys);
}
inputFieldsUsed.addAll(RelCollations.ordinals(aggCall.collation));
}

// Create input with trimmed columns.
final RelNode input = aggregate.getInput();
final Set<RelDataTypeField> inputExtraFields = Collections.emptySet();
final TrimResult trimResult =
trimChild(aggregate, input, inputFieldsUsed.build(), inputExtraFields);
final RelNode newInput = trimResult.left;
final Mapping inputMapping = trimResult.right;
// We have to return group keys and (if present) indicators.
// So, pretend that the consumer asked for them.
final int groupCount = aggregate.getGroupSet().cardinality();
fieldsUsed =
fieldsUsed.union(ImmutableBitSet.range(groupCount));

// If the input is unchanged, and we need to project all columns,
// there's nothing to do.
if (input == newInput
&& fieldsUsed.equals(ImmutableBitSet.range(rowType.getFieldCount()))) {
return result(aggregate,
Mappings.createIdentity(rowType.getFieldCount()));
}

// Which agg calls are used by our consumer?
int j = groupCount;
int usedAggCallCount = 0;
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
if (fieldsUsed.get(j++)) {
++usedAggCallCount;
}
}

// Offset due to the number of system fields having changed.
Mapping mapping =
Mappings.create(
MappingType.INVERSE_SURJECTION,
rowType.getFieldCount(),
groupCount + usedAggCallCount);

final ImmutableBitSet newGroupSet =
Mappings.apply(inputMapping, aggregate.getGroupSet());

final ImmutableList<ImmutableBitSet> newGroupSets =
ImmutableList.copyOf(
Util.transform(aggregate.getGroupSets(),
input1 -> Mappings.apply(inputMapping, input1)));

// Populate mapping of where to find the fields. System, group key and
// indicator fields first.
for (j = 0; j < groupCount; j++) {
mapping.set(j, j);
}

// Now create new agg calls, and populate mapping for them.
relBuilder.push(newInput);
final List<RelBuilder.AggCall> newAggCallList = new ArrayList<>();
j = groupCount;
for (AggregateCall aggCall : aggregate.getAggCallList()) {
if (fieldsUsed.get(j)) {
mapping.set(j, groupCount + newAggCallList.size());
newAggCallList.add(relBuilder.aggregateCall(aggCall, inputMapping));
}
++j;
}

if (newAggCallList.isEmpty() && newGroupSet.isEmpty()) {
// Add a dummy call if all the column fields have been trimmed
mapping = Mappings.create(
MappingType.INVERSE_SURJECTION,
mapping.getSourceCount(),
1);
newAggCallList.add(relBuilder.count(false, "DUMMY"));
}

final RelBuilder.GroupKey groupKey = relBuilder.groupKey(newGroupSet, newGroupSets);
relBuilder.aggregate(groupKey, newAggCallList);

final RelNode newAggregate = RelOptUtil.propagateRelHints(aggregate, relBuilder.build());
return result(newAggregate, mapping);
// final RelDataType rowType = aggregate.getRowType();
//
// // Compute which input fields are used.
// // 1. group fields are always used
// final ImmutableBitSet.Builder inputFieldsUsed =
// aggregate.getGroupSet().rebuild();
// // 2. agg functions
// for (AggregateCall aggCall : aggregate.getAggCallList()) {
// inputFieldsUsed.addAll(aggCall.getArgList());
// if (aggCall.filterArg >= 0) {
// inputFieldsUsed.set(aggCall.filterArg);
// }
// if (aggCall.distinctKeys != null) {
// inputFieldsUsed.addAll(aggCall.distinctKeys);
// }
// inputFieldsUsed.addAll(RelCollations.ordinals(aggCall.collation));
// }
//
// // Create input with trimmed columns.
// final RelNode input = aggregate.getInput();
// final Set<RelDataTypeField> inputExtraFields = Collections.emptySet();
// final TrimResult trimResult =
// trimChild(aggregate, input, inputFieldsUsed.build(), inputExtraFields);
// final RelNode newInput = trimResult.left;
// final Mapping inputMapping = trimResult.right;
// // We have to return group keys and (if present) indicators.
// // So, pretend that the consumer asked for them.
// final int groupCount = aggregate.getGroupSet().cardinality();
// fieldsUsed =
// fieldsUsed.union(ImmutableBitSet.range(groupCount));
//
// // If the input is unchanged, and we need to project all columns,
// // there's nothing to do.
// if (input == newInput
// && fieldsUsed.equals(ImmutableBitSet.range(rowType.getFieldCount()))) {
// return result(aggregate,
// Mappings.createIdentity(rowType.getFieldCount()));
// }
//
// // Which agg calls are used by our consumer?
// int j = groupCount;
// int usedAggCallCount = 0;
// for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
// if (fieldsUsed.get(j++)) {
// ++usedAggCallCount;
// }
// }
//
// // Offset due to the number of system fields having changed.
// Mapping mapping =
// Mappings.create(
// MappingType.INVERSE_SURJECTION,
// rowType.getFieldCount(),
// groupCount + usedAggCallCount);
//
// final ImmutableBitSet newGroupSet =
// Mappings.apply(inputMapping, aggregate.getGroupSet());
//
// final ImmutableList<ImmutableBitSet> newGroupSets =
// ImmutableList.copyOf(
// Util.transform(aggregate.getGroupSets(),
// input1 -> Mappings.apply(inputMapping, input1)));
//
// // Populate mapping of where to find the fields. System, group key and
// // indicator fields first.
// for (j = 0; j < groupCount; j++) {
// mapping.set(j, j);
// }
//
// // Now create new agg calls, and populate mapping for them.
// relBuilder.push(newInput);
// final List<RelBuilder.AggCall> newAggCallList = new ArrayList<>();
// j = groupCount;
// for (AggregateCall aggCall : aggregate.getAggCallList()) {
// if (fieldsUsed.get(j)) {
// mapping.set(j, groupCount + newAggCallList.size());
// newAggCallList.add(relBuilder.aggregateCall(aggCall, inputMapping));
// }
// ++j;
// }
//
// if (newAggCallList.isEmpty() && newGroupSet.isEmpty()) {
// // Add a dummy call if all the column fields have been trimmed
// mapping = Mappings.create(
// MappingType.INVERSE_SURJECTION,
// mapping.getSourceCount(),
// 1);
// newAggCallList.add(relBuilder.count(false, "DUMMY"));
// }
//
// final RelBuilder.GroupKey groupKey = relBuilder.groupKey(newGroupSet, newGroupSets);
// relBuilder.aggregate(groupKey, newAggCallList);
//
// final RelNode newAggregate = RelOptUtil.propagateRelHints(aggregate, relBuilder.build());
// return result(newAggregate, mapping);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ systemProp.org.gradle.internal.publish.checksums.insecure=true
# This is version for Calcite itself
# Note: it should not include "-SNAPSHOT" as it is automatically added by build.gradle.kts
# Release version can be generated by using -Prelease or -Prc=<int> arguments
calcite.version=1.30.0-kylin-4.x-r10
calcite.version=1.30.0-kylin-4.x-r11
# This is a version to be used from Maven repository. It can be overridden by localAvatica below
calcite.avatica.version=1.22.0

Expand Down

0 comments on commit 4342732

Please sign in to comment.