diff --git a/runtime/vam/expr/agg/avg.go b/runtime/vam/expr/agg/avg.go index 214215f776..7c17bd589f 100644 --- a/runtime/vam/expr/agg/avg.go +++ b/runtime/vam/expr/agg/avg.go @@ -43,13 +43,15 @@ func (a *avg) ConsumeAsPartial(partial vector.Any) { if !ok1 || !ok2 { panic("avg: invalid partial") } - sumVal, ok1 := rec.Fields[si].(*vector.Const) - countVal, ok2 := rec.Fields[ci].(*vector.Const) - if !ok1 || !ok2 || sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 { + sumVal := rec.Fields[si] + countVal := rec.Fields[ci] + if sumVal.Type() != super.TypeFloat64 || countVal.Type() != super.TypeUint64 { panic("avg: invalid partial") } - a.sum += sumVal.Value().Float() - a.count += countVal.Value().Uint() + sum, _ := vector.FloatValue(sumVal, 0) + count, _ := vector.UintValue(countVal, 0) + a.sum += sum + a.count += count } func (a *avg) ResultAsPartial(zctx *super.Context) super.Value { diff --git a/runtime/vam/expr/agg/count.go b/runtime/vam/expr/agg/count.go index 9272efabc7..d25f9d4fbb 100644 --- a/runtime/vam/expr/agg/count.go +++ b/runtime/vam/expr/agg/count.go @@ -37,11 +37,11 @@ func (a *count) Result(*super.Context) super.Value { } func (a *count) ConsumeAsPartial(partial vector.Any) { - c, ok := partial.(*vector.Const) - if !ok || c.Len() != 1 || partial.Type() != super.TypeUint64 { + if partial.Len() != 1 || partial.Type() != super.TypeUint64 { panic("count: bad partial") } - a.count += c.Value().Uint() + count, _ := vector.UintValue(partial, 0) + a.count += count } func (a *count) ResultAsPartial(*super.Context) super.Value { diff --git a/runtime/vam/op/summarize/agg.go b/runtime/vam/op/summarize/agg.go index a3fa2a2b20..f592e72dae 100644 --- a/runtime/vam/op/summarize/agg.go +++ b/runtime/vam/op/summarize/agg.go @@ -24,7 +24,8 @@ type superTable struct { builder *vector.RecordBuilder partialsIn bool partialsOut bool - table map[string]aggRow + table map[string]int + rows []aggRow zctx *super.Context } @@ -52,11 +53,13 @@ func (s *superTable) update(keys []vector.Any, args []vector.Any) { m[string(keyBytes)] = append(m[string(keyBytes)], slot) } for rowKey, index := range m { - row, ok := s.table[rowKey] + id, ok := s.table[rowKey] if !ok { - row = s.newRow(keys, index) - s.table[rowKey] = row + id = len(s.rows) + s.table[rowKey] = id + s.rows = append(s.rows, s.newRow(keys, index)) } + row := s.rows[id] for i, arg := range args { if len(m) > 1 { arg = vector.NewView(arg, index) @@ -85,32 +88,40 @@ func (s *superTable) newRow(keys []vector.Any, index []uint32) aggRow { } func (s *superTable) materialize() vector.Any { + if len(s.rows) == 0 { + return vector.NewConst(super.Null, 0, nil) + } var vecs []vector.Any - var tags []uint32 - // XXX This should reasonably concat all materialize rows together instead - // of this crazy Dynamic hack. - for _, row := range s.table { - tags = append(tags, uint32(len(tags))) - vecs = append(vecs, s.materializeRow(row)) + for i := range s.rows[0].keys { + vecs = append(vecs, s.materializeKey(i)) + } + for i := range s.rows[0].funcs { + vecs = append(vecs, s.materializeAgg(i)) } - return vector.NewDynamic(tags, vecs) + // Since aggs can return dynamic values need to do apply to create record. + return vector.Apply(false, func(vecs ...vector.Any) vector.Any { + return s.builder.New(vecs) + }, vecs...) } -func (s *superTable) materializeRow(row aggRow) vector.Any { - var vecs []vector.Any - for _, key := range row.keys { - vecs = append(vecs, vector.NewConst(key, 1, nil)) +func (s *superTable) materializeKey(i int) vector.Any { + b := vector.NewBuilder(s.rows[0].keys[i].Type()) + for _, row := range s.rows { + b.Write(row.keys[i].Bytes()) } - for _, fn := range row.funcs { - var val super.Value + return b.Build() +} + +func (s *superTable) materializeAgg(i int) vector.Any { + b := vector.NewDynamicBuilder() + for _, row := range s.rows { if s.partialsOut { - val = fn.ResultAsPartial(s.zctx) + b.Write(row.funcs[i].ResultAsPartial(s.zctx)) } else { - val = fn.Result(s.zctx) + b.Write(row.funcs[i].Result(s.zctx)) } - vecs = append(vecs, vector.NewConst(val, 1, nil)) } - return s.builder.New(vecs) + return b.Build() } type countByString struct { diff --git a/runtime/vam/op/summarize/summarize.go b/runtime/vam/op/summarize/summarize.go index 62d3d42204..0f85009da9 100644 --- a/runtime/vam/op/summarize/summarize.go +++ b/runtime/vam/op/summarize/summarize.go @@ -111,7 +111,7 @@ func (s *Summarize) newAggTable(keyTypes []super.Type) aggTable { builder: s.builder, partialsIn: s.partialsIn, partialsOut: s.partialsOut, - table: make(map[string]aggRow), + table: make(map[string]int), zctx: s.zctx, } } diff --git a/vector/builder.go b/vector/builder.go index a65c2f9d50..a9b7847937 100644 --- a/vector/builder.go +++ b/vector/builder.go @@ -14,6 +14,41 @@ type Builder interface { Build() Any } +type DynamicBuilder struct { + tags []uint32 + values []Builder + which map[super.Type]int +} + +func NewDynamicBuilder() *DynamicBuilder { + return &DynamicBuilder{ + which: make(map[super.Type]int), + } +} + +func (d *DynamicBuilder) Write(val super.Value) { + typ := val.Type() + tag, ok := d.which[typ] + if !ok { + tag = len(d.values) + d.values = append(d.values, NewBuilder(typ)) + d.which[typ] = tag + } + d.tags = append(d.tags, uint32(tag)) + d.values[tag].Write(val.Bytes()) +} + +func (d *DynamicBuilder) Build() Any { + var vecs []Any + for _, b := range d.values { + vecs = append(vecs, b.Build()) + } + if len(vecs) == 1 { + return vecs[0] + } + return NewDynamic(d.tags, vecs) +} + func NewBuilder(typ super.Type) Builder { var b Builder switch typ := typ.(type) {