Skip to content

Commit

Permalink
[nereids] group by key elimination
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjian.xzj authored and zhongjian.xzj committed Feb 2, 2024
1 parent 2766391 commit f60f3f3
Show file tree
Hide file tree
Showing 19 changed files with 0 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
* Block fd propagation, it always returns an empty fd
*/
public interface BlockFuncDepsPropagation extends LogicalPlan {
@Override
default FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
return FunctionalDependencies.EMPTY_FUNC_DEPS;
}

@Override
default ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
return ImmutableSet.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,6 @@
* Propagate fd, keep children's fd
*/
public interface PropagateFuncDeps extends LogicalPlan {
@Override
default FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
if (children().size() == 1) {
// Note when changing function dependencies, we always clone it.
// So it's safe to return a reference
return child(0).getLogicalProperties().getFunctionalDependencies();
}
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder();
children().stream()
.map(p -> p.getLogicalProperties().getFunctionalDependencies())
.forEach(builder::addFunctionalDependencies);
return builder.build();
}

@Override
default ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
if (children().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,52 +320,6 @@ && child().getLogicalProperties().getFunctionalDependencies().isUniqueAndNotNull
}
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies childFd = child(0).getLogicalProperties().getFunctionalDependencies();
Set<Slot> outputSet = new HashSet<>(outputSupplier.get());
Builder fdBuilder = new Builder();
// when group by all tuples, the result only have one row
if (groupByExpressions.isEmpty()) {
outputSet.forEach(s -> {
fdBuilder.addUniformSlot(s);
fdBuilder.addUniqueSlot(s);
});
return fdBuilder.build();
}

// when group by complicate expression or virtual slot, just propagate uniform slots
if (groupByExpressions.stream()
.anyMatch(s -> !(s instanceof SlotReference) || s instanceof VirtualSlotReference)) {
fdBuilder.addUniformSlot(childFd);
fdBuilder.pruneSlots(outputSet);
return fdBuilder.build();
}

// when group by uniform slot, the result only have one row
ImmutableSet<Slot> groupByKeys = groupByExpressions.stream()
.map(s -> (Slot) s)
.collect(ImmutableSet.toImmutableSet());
if (childFd.isUniformAndNotNull(groupByKeys)) {
outputSupplier.get().forEach(s -> {
fdBuilder.addUniformSlot(s);
fdBuilder.addUniqueSlot(s);
});
}

// when group by unique slot, the result depends on agg func
if (childFd.isUniqueAndNotNull(groupByKeys)) {
for (NamedExpression namedExpression : getOutputExpressions()) {
updateFuncDepsGroupByUnique(namedExpression, fdBuilder);
}
}

// group by keys is unique
fdBuilder.addUniqueSlot(groupByKeys);
fdBuilder.pruneSlots(outputSet);
return fdBuilder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet.Builder<FdItem> builder = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,42 +126,6 @@ public String qualifiedName() {
return Utils.qualifiedName(qualifier, table.getName());
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
Builder fdBuilder = new Builder();
Set<Slot> output = ImmutableSet.copyOf(outputSupplier.get());
if (table instanceof OlapTable && ((OlapTable) table).getKeysType().isAggregationFamily()) {
ImmutableSet<Slot> slotSet = output.stream()
.filter(SlotReference.class::isInstance)
.map(SlotReference.class::cast)
.filter(s -> s.getColumn().isPresent()
&& s.getColumn().get().isKey())
.collect(ImmutableSet.toImmutableSet());
fdBuilder.addUniqueSlot(slotSet);
}
table.getPrimaryKeyConstraints().forEach(c -> {
Set<Column> columns = c.getPrimaryKeys(this.getTable());
ImmutableSet<Slot> slotSet = output.stream()
.filter(SlotReference.class::isInstance)
.map(SlotReference.class::cast)
.filter(s -> s.getColumn().isPresent()
&& columns.contains(s.getColumn().get()))
.collect(ImmutableSet.toImmutableSet());
fdBuilder.addUniqueSlot(slotSet);
});
table.getUniqueConstraints().forEach(c -> {
Set<Column> columns = c.getUniqueKeys(this.getTable());
ImmutableSet<Slot> slotSet = output.stream()
.filter(SlotReference.class::isInstance)
.map(SlotReference.class::cast)
.filter(s -> s.getColumn().isPresent()
&& columns.contains(s.getColumn().get()))
.collect(ImmutableSet.toImmutableSet());
fdBuilder.addUniqueSlot(slotSet);
});
return fdBuilder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
Set<NamedExpression> output = ImmutableSet.copyOf(outputSupplier.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,6 @@ public List<Slot> computeOutput() {
.collect(ImmutableList.toImmutableList());
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies fd = child(0).getLogicalProperties().getFunctionalDependencies();
if (getLimit() == 1) {
Builder builder = new Builder();
List<Slot> output = outputSupplier.get();
output.forEach(builder::addUniformSlot);
output.forEach(builder::addUniqueSlot);
fd = builder.build();
}
return fd;
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet<FdItem> fdItems = child(0).getLogicalProperties().getFdItems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,6 @@ public LogicalExcept withNewOutputs(List<NamedExpression> newOutputs) {
Optional.empty(), Optional.empty(), children);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies.Builder builder = new FunctionalDependencies
.Builder(child(0).getLogicalProperties().getFunctionalDependencies());
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> output = outputSupplier.get();
List<? extends Slot> originalOutputs = regularChildrenOutputs.isEmpty()
? child(0).getOutput()
: regularChildrenOutputs.get(0);
for (int i = 0; i < output.size(); i++) {
replaceMap.put(originalOutputs.get(i), output.get(i));
}
builder.replace(replaceMap);
if (qualifier == Qualifier.DISTINCT) {
builder.addUniqueSlot(ImmutableSet.copyOf(outputSupplier.get()));
}
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
Set<NamedExpression> output = ImmutableSet.copyOf(outputSupplier.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,6 @@ public LogicalFilter<Plan> withConjunctsAndChild(Set<Expression> conjuncts, Plan
return new LogicalFilter<>(conjuncts, child);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
Builder fdBuilder = new Builder(
child().getLogicalProperties().getFunctionalDependencies());
getConjuncts().forEach(e -> fdBuilder.addUniformSlot(ExpressionUtils.extractUniformSlot(e)));
return fdBuilder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet.Builder<FdItem> builder = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ public int hashCode() {
return Objects.hash(generators, generatorOutput);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder();
builder.addUniformSlot(child(0).getLogicalProperties().getFunctionalDependencies());
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
return ImmutableSet.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ public boolean equals(Object object) {
return conjuncts.equals(other.conjuncts);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
Builder fdBuilder = new Builder(
child().getLogicalProperties().getFunctionalDependencies());
getConjuncts().forEach(e -> fdBuilder.addUniformSlot(ExpressionUtils.extractUniformSlot(e)));
return fdBuilder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet.Builder<FdItem> builder = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,6 @@ void replaceSlotInFuncDeps(FunctionalDependencies.Builder builder,
builder.replace(replaceMap);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder();
for (Plan child : children) {
builder.addFunctionalDependencies(
child.getLogicalProperties().getFunctionalDependencies());
replaceSlotInFuncDeps(builder, child.getOutput(), outputSupplier.get());
}
if (qualifier == Qualifier.DISTINCT) {
builder.addUniqueSlot(ImmutableSet.copyOf(outputSupplier.get()));
}
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
Set<NamedExpression> output = ImmutableSet.copyOf(outputSupplier.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,73 +611,6 @@ private ImmutableSet<TableIf> getTableList(LogicalPlan plan) {
return resultSet;
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
if (isMarkJoin()) {
// TODO disable function dependence calculation for mark join, but need re-think this in future.
return FunctionalDependencies.EMPTY_FUNC_DEPS;
}
//1. NALAJ and FOJ block functional dependencies
if (joinType.isNullAwareLeftAntiJoin() || joinType.isFullOuterJoin()) {
return FunctionalDependencies.EMPTY_FUNC_DEPS;
}

// left/right semi/anti join propagate left/right functional dependencies
if (joinType.isLeftAntiJoin() || joinType.isLefSemiJoin()) {
return left().getLogicalProperties().getFunctionalDependencies();
}
if (joinType.isRightSemiJoin() || joinType.isRightAntiJoin()) {
return right().getLogicalProperties().getFunctionalDependencies();
}

// if there is non-equal join conditions, block functional dependencies
if (!otherJoinConjuncts.isEmpty()) {
return FunctionalDependencies.EMPTY_FUNC_DEPS;
}

Pair<Set<Slot>, Set<Slot>> keys = extractNullRejectHashKeys();
if (keys == null) {
return FunctionalDependencies.EMPTY_FUNC_DEPS;
}

// Note here we only check whether the left is unique.
// So the hash condition can't be null-safe
// TODO: consider Null-safe hash condition when left and rigth is not nullable
boolean isLeftUnique = left().getLogicalProperties()
.getFunctionalDependencies().isUnique(keys.first);
boolean isRightUnique = right().getLogicalProperties()
.getFunctionalDependencies().isUnique(keys.second);
Builder fdBuilder = new Builder();
if (joinType.isInnerJoin()) {
// inner join propagate uniforms slots
// And if the hash keys is unique, inner join can propagate all functional dependencies
if (isLeftUnique && isRightUnique) {
fdBuilder.addFunctionalDependencies(left().getLogicalProperties().getFunctionalDependencies());
fdBuilder.addFunctionalDependencies(right().getLogicalProperties().getFunctionalDependencies());
} else {
fdBuilder.addUniformSlot(left().getLogicalProperties().getFunctionalDependencies());
fdBuilder.addUniformSlot(right().getLogicalProperties().getFunctionalDependencies());
}
}

// left/right outer join propagate left/right uniforms slots
// And if the right/left hash keys is unique,
// join can propagate left/right functional dependencies
if (joinType.isLeftOuterJoin()) {
if (isRightUnique) {
return left().getLogicalProperties().getFunctionalDependencies();
}
fdBuilder.addUniformSlot(left().getLogicalProperties().getFunctionalDependencies());
}
if (joinType.isRightOuterJoin()) {
if (isLeftUnique) {
return left().getLogicalProperties().getFunctionalDependencies();
}
fdBuilder.addUniformSlot(left().getLogicalProperties().getFunctionalDependencies());
}
return fdBuilder.build();
}

/**
* get Equal slot from join
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,6 @@ public LogicalLimit<Plan> withChildren(List<Plan> children) {
return new LogicalLimit<>(limit, offset, phase, children.get(0));
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies fd = child(0).getLogicalProperties().getFunctionalDependencies();
if (getLimit() == 1 && !phase.isLocal()) {
Builder builder = new Builder();
outputSupplier.get().forEach(builder::addUniformSlot);
outputSupplier.get().forEach(builder::addUniqueSlot);
fd = builder.build();
}
return fd;
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet<FdItem> fdItems = child(0).getLogicalProperties().getFdItems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,6 @@ public Plan pruneOutputs(List<NamedExpression> prunedOutputs) {
return withProjects(prunedOutputs);
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder();
outputSupplier.get().forEach(s -> {
builder.addUniformSlot(s);
builder.addUniqueSlot(s);
});
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
Set<NamedExpression> output = ImmutableSet.copyOf(outputSupplier.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,28 +228,6 @@ public JSONObject toJson() {
return logicalProject;
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies childFuncDeps = child().getLogicalProperties().getFunctionalDependencies();
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder(childFuncDeps);
builder.pruneSlots(new HashSet<>(outputSupplier.get()));
projects.stream().filter(Alias.class::isInstance).forEach(proj -> {
if (proj.child(0).isConstant()) {
builder.addUniformSlot(proj.toSlot());
} else if (proj.child(0) instanceof Uuid) {
builder.addUniqueSlot(proj.toSlot());
} else if (ExpressionUtils.isInjective(proj.child(0))) {
ImmutableSet<Slot> inputs = ImmutableSet.copyOf(proj.getInputSlots());
if (childFuncDeps.isUnique(inputs)) {
builder.addUniqueSlot(proj.toSlot());
} else if (childFuncDeps.isUniform(inputs)) {
builder.addUniformSlot(proj.toSlot());
}
}
});
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet.Builder<FdItem> builder = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,6 @@ public boolean canBindVirtualSlot() {
.noneMatch(output -> output.containsType(VirtualSlotReference.class));
}

@Override
public FunctionalDependencies computeFuncDeps(Supplier<List<Slot>> outputSupplier) {
FunctionalDependencies.Builder builder = new FunctionalDependencies.Builder();
// Note uniform does not reject nullable slots
outputSupplier.get().stream()
.filter(child(0).getLogicalProperties().getFunctionalDependencies()::isUniform)
.forEach(builder::addUniformSlot);
return builder.build();
}

@Override
public ImmutableSet<FdItem> computeFdItems(Supplier<List<Slot>> outputSupplier) {
ImmutableSet.Builder<FdItem> builder = ImmutableSet.builder();
Expand Down
Loading

0 comments on commit f60f3f3

Please sign in to comment.