diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index fd6e9ceb66..c67c370324 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -238,7 +238,7 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { } lister.KeyPruner = maybeNewRangePruner(filter, sortKeys) seq = dag.Seq{lister} - _, _, orderRequired, _, err := o.concurrentPath(chain, sortKeys) + _, _, orderRequired, err := o.concurrentPath(chain, sortKeys) if err != nil { return nil, err } diff --git a/compiler/optimizer/parallelize.go b/compiler/optimizer/parallelize.go index 4f5b6c7fbb..e6d3c02e2e 100644 --- a/compiler/optimizer/parallelize.go +++ b/compiler/optimizer/parallelize.go @@ -37,7 +37,7 @@ func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, seq dag.Seq, replicas } // concurrentPath will check that the path consisting of the original source // sequence and any lifted sequence is still parallelizable. - n, outputKeys, _, needMerge, err := o.concurrentPath(seq[1:], srcSortKeys) + n, outputKeys, _, err := o.concurrentPath(seq[1:], srcSortKeys) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (o *Optimizer) parallelizeSeqScan(scan *dag.SeqScan, seq dag.Seq, replicas scatter.Paths[k] = copySeq(head) } var merge dag.Op - if needMerge { + if len(outputKeys) > 0 { // At this point, we always insert a merge as we don't know if the // downstream DAG requires the sort order. A later step will look at // the fanin from this parallel structure and see if the merge can be @@ -204,17 +204,17 @@ func parallelPaths(op dag.Op) ([]dag.Seq, bool) { return nil, false } -// concurrentPath returns the largest path within ops from front to end that can +// concurrentPath returns the largest path within seq from front to end that can // be parallelized and run concurrently while preserving its semantics where -// the input to ops is known to have an order defined by sortKey (or order.Nil +// the input to seq is known to have an order defined by sortKey (or order.Nil // if unknown). // The length of the concurrent path is returned and the sort order at // exit from that path is returned. If sortKey is zero, then the // concurrent path is allowed to include operators that do not guarantee // an output order. -func (o *Optimizer) concurrentPath(ops []dag.Op, sortKeys order.SortKeys) (int, order.SortKeys, bool, bool, error) { - for k := range ops { - switch op := ops[k].(type) { +func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length int, outputKeys order.SortKeys, orderRequired bool, err error) { + for k := range seq { + switch op := seq[k].(type) { // This should be a boolean in op.go that defines whether // function can be parallelized... need to think through // what the meaning is here exactly. This is all still a bit @@ -226,35 +226,35 @@ func (o *Optimizer) concurrentPath(ops []dag.Op, sortKeys order.SortKeys) (int, if isKeyOfSummarize(op, sortKeys) { // Keep the input ordered so we can incrementally release // results from the groupby as a streaming operation. - return k, sortKeys, true, true, nil + return k, sortKeys, true, nil } - return k, nil, false, false, nil + return k, nil, false, nil case *dag.Sort: newKeys := sortKeysOfSort(op) if newKeys.IsNil() { // No analysis for sort without expression since we can't // parallelize the heuristic. We should revisit these semantics // and define a global order across Zed type. - return 0, nil, false, false, nil + return 0, nil, false, nil } - return k, newKeys, false, true, nil + return k, newKeys, false, nil case *dag.Load: // XXX At some point Load should have an optimization where if the // upstream sort is the same as the Load destination sort we // request a merge and set the Load operator to do a sorted write. - return k, nil, false, false, nil + return k, nil, false, nil case *dag.Fork, *dag.Scatter, *dag.Mirror, *dag.Head, *dag.Tail, *dag.Uniq, *dag.Fuse, *dag.Join, *dag.Output: - return k, sortKeys, true, true, nil + return k, sortKeys, true, nil default: next, err := o.analyzeSortKeys(op, sortKeys) if err != nil { - return 0, nil, false, false, err + return 0, nil, false, err } if !sortKeys.IsNil() && next.IsNil() { - return k, sortKeys, true, true, nil + return k, sortKeys, true, nil } sortKeys = next } } - return len(ops), sortKeys, true, true, nil + return len(seq), sortKeys, true, nil }