Skip to content

Commit

Permalink
Remove optimizer.Optimizer.concurrentPath needMerge return param (#5411)
Browse files Browse the repository at this point in the history
The needMerge return parameter is redundant because it should always be
equal to len(outputKeys)==0.  (Intuitively, if len(outputKeys)>0, then
the output is ordered and a dag.Merge is necessary, but if
len(outputKeys)==0, then the output is unordered and a dag.Combine
suffices.)

Note that concurrentPath can currently return needMerge==true when
len(outputKeys)==0 but that behavior is erroneous because a merge
requires a merge key.
  • Loading branch information
nwt authored Nov 1, 2024
1 parent 0d91eae commit ac8b7e0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) {
}
lister.KeyPruner = maybeNewRangePruner(filter, sortKeys)
seq = dag.Seq{lister}
_, _, orderRequired, _, err := o.concurrentPath(chain, sortKeys)
_, _, orderRequired, err := o.concurrentPath(chain, sortKeys)
if err != nil {
return nil, err
}
Expand Down
32 changes: 16 additions & 16 deletions compiler/optimizer/parallelize.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, seq dag.Seq, replicas
}
// concurrentPath will check that the path consisting of the original source
// sequence and any lifted sequence is still parallelizable.
n, outputKeys, _, needMerge, err := o.concurrentPath(seq[1:], srcSortKeys)
n, outputKeys, _, err := o.concurrentPath(seq[1:], srcSortKeys)
if err != nil {
return nil, err
}
Expand All @@ -55,7 +55,7 @@ func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, seq dag.Seq, replicas
scatter.Paths[k] = copySeq(head)
}
var merge dag.Op
if needMerge {
if len(outputKeys) > 0 {
// At this point, we always insert a merge as we don't know if the
// downstream DAG requires the sort order. A later step will look at
// the fanin from this parallel structure and see if the merge can be
Expand Down Expand Up @@ -204,17 +204,17 @@ func parallelPaths(op dag.Op) ([]dag.Seq, bool) {
return nil, false
}

// concurrentPath returns the largest path within ops from front to end that can
// concurrentPath returns the largest path within seq from front to end that can
// be parallelized and run concurrently while preserving its semantics where
// the input to ops is known to have an order defined by sortKey (or order.Nil
// the input to seq is known to have an order defined by sortKey (or order.Nil
// if unknown).
// The length of the concurrent path is returned and the sort order at
// exit from that path is returned. If sortKey is zero, then the
// concurrent path is allowed to include operators that do not guarantee
// an output order.
func (o *Optimizer) concurrentPath(ops []dag.Op, sortKeys order.SortKeys) (int, order.SortKeys, bool, bool, error) {
for k := range ops {
switch op := ops[k].(type) {
func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length int, outputKeys order.SortKeys, orderRequired bool, err error) {
for k := range seq {
switch op := seq[k].(type) {
// This should be a boolean in op.go that defines whether
// function can be parallelized... need to think through
// what the meaning is here exactly. This is all still a bit
Expand All @@ -226,35 +226,35 @@ func (o *Optimizer) concurrentPath(ops []dag.Op, sortKeys order.SortKeys) (int,
if isKeyOfSummarize(op, sortKeys) {
// Keep the input ordered so we can incrementally release
// results from the groupby as a streaming operation.
return k, sortKeys, true, true, nil
return k, sortKeys, true, nil
}
return k, nil, false, false, nil
return k, nil, false, nil
case *dag.Sort:
newKeys := sortKeysOfSort(op)
if newKeys.IsNil() {
// No analysis for sort without expression since we can't
// parallelize the heuristic. We should revisit these semantics
// and define a global order across Zed type.
return 0, nil, false, false, nil
return 0, nil, false, nil
}
return k, newKeys, false, true, nil
return k, newKeys, false, nil
case *dag.Load:
// XXX At some point Load should have an optimization where if the
// upstream sort is the same as the Load destination sort we
// request a merge and set the Load operator to do a sorted write.
return k, nil, false, false, nil
return k, nil, false, nil
case *dag.Fork, *dag.Scatter, *dag.Mirror, *dag.Head, *dag.Tail, *dag.Uniq, *dag.Fuse, *dag.Join, *dag.Output:
return k, sortKeys, true, true, nil
return k, sortKeys, true, nil
default:
next, err := o.analyzeSortKeys(op, sortKeys)
if err != nil {
return 0, nil, false, false, err
return 0, nil, false, err
}
if !sortKeys.IsNil() && next.IsNil() {
return k, sortKeys, true, true, nil
return k, sortKeys, true, nil
}
sortKeys = next
}
}
return len(ops), sortKeys, true, true, nil
return len(seq), sortKeys, true, nil
}

0 comments on commit ac8b7e0

Please sign in to comment.