Skip to content

Commit

Permalink
Add map function
Browse files Browse the repository at this point in the history
The map function that applies a function to every element in an array or set
value.

Also: rename the map aggregator to collect_map to avoid collision with
this new function.

Closes #4610
  • Loading branch information
mattnibs committed Oct 12, 2023
1 parent d67999d commit c5a2511
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 17 deletions.
6 changes: 6 additions & 0 deletions compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type (
Kind string `json:"kind" unpack:""`
Value string `json:"value"`
}
MapCall struct {
Kind string `json:"kind" unpack:""`
Expr Expr `json:"expr"`
Inner Expr `json:"inner"`
}
MapExpr struct {
Kind string `json:"kind" unpack:""`
Entries []Entry `json:"entries"`
Expand Down Expand Up @@ -121,6 +126,7 @@ func (*Conditional) ExprDAG() {}
func (*Dot) ExprDAG() {}
func (*Func) ExprDAG() {}
func (*Literal) ExprDAG() {}
func (*MapCall) ExprDAG() {}
func (*MapExpr) ExprDAG() {}
func (*OverExpr) ExprDAG() {}
func (*RecordExpr) ExprDAG() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var unpacker = unpack.New(
Lister{},
Literal{},
Load{},
MapCall{},
MapExpr{},
Merge{},
Over{},
Expand Down
14 changes: 14 additions & 0 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (b *Builder) compileExpr(e dag.Expr) (expr.Evaluator, error) {
return b.compileArrayExpr(e)
case *dag.SetExpr:
return b.compileSetExpr(e)
case *dag.MapCall:
return b.compileMapCall(e)
case *dag.MapExpr:
return b.compileMapExpr(e)
case *dag.Agg:
Expand Down Expand Up @@ -309,6 +311,18 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) {
return expr.NewCall(b.zctx(), fn, exprs), nil
}

func (b *Builder) compileMapCall(a *dag.MapCall) (expr.Evaluator, error) {
e, err := b.compileExpr(a.Expr)
if err != nil {
return nil, err
}
inner, err := b.compileExpr(a.Inner)
if err != nil {
return nil, err
}
return expr.NewMapFunc(b.zctx(), e, inner), nil
}

func (b *Builder) compileShaper(node dag.Call, tf expr.ShaperTransform) (expr.Evaluator, error) {
args := node.Args
field, err := b.compileExpr(args[0])
Expand Down
22 changes: 22 additions & 0 deletions compiler/semantic/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,28 @@ func (a *analyzer) semCall(call *ast.Call) (dag.Expr, error) {
if nargs == 1 {
exprs = append([]dag.Expr{&dag.This{Kind: "This"}}, exprs...)
}
case name == "map":
if err := function.CheckArgCount(nargs, 2, 2); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
}
id, ok := call.Args[1].(*ast.ID)
if !ok {
return nil, fmt.Errorf("%s(): second argument must be the identifier of a function", name)
}
// Validate that the called func takes a single argument
inner, err := a.semCall(&ast.Call{
Kind: "Call",
Name: id.Name,
Args: []ast.Expr{&ast.ID{Kind: "ID", Name: "this"}},
})
if err != nil {
return nil, err
}
return &dag.MapCall{
Kind: "MapCall",
Expr: exprs[0],
Inner: inner,
}, nil
default:
if _, _, err = function.New(a.zctx, name, nargs); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
### Aggregate Function

  **map** — aggregate map values into a single map
  **collect_map** — aggregate map values into a single map

### Synopsis
```
map(|{any:any}|) -> |{any:any}|
collect_map(|{any:any}|) -> |{any:any}|
```

### Description

The _map_ aggregate function combines map inputs into a single map output.
If _map_ receives multiple values for the same key, the last value received is
The _collect_map_ aggregate function combines map inputs into a single map output.
If _collect_map_ receives multiple values for the same key, the last value received is
retained. If the input keys or values vary in type, the return type will be a map
of union of those types.

### Examples

Combine a sequence of records into a map:
```mdtest-command
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{stock:price}|)' -
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'collect_map(|{stock:price}|)' -
```
=>
```mdtest-output
Expand All @@ -27,7 +27,7 @@ echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{sto

Continuous collection over a simple sequence:
```mdtest-command
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield map(this)' -
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield collect_map(this)' -
```
=>
```mdtest-output
Expand Down
4 changes: 2 additions & 2 deletions runtime/expr/agg/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func NewPattern(op string, hasarg bool) (Pattern, error) {
pattern = func() Function {
return newMathReducer(anymath.Add)
}
case "map":
case "collect_map":
pattern = func() Function {
return newMap()
return newCollectMap()
}
case "min":
pattern = func() Function {
Expand Down
14 changes: 7 additions & 7 deletions runtime/expr/agg/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"golang.org/x/exp/slices"
)

type Map struct {
type CollectMap struct {
entries map[string]mapEntry
scratch []byte
}

func newMap() *Map {
return &Map{entries: make(map[string]mapEntry)}
func newCollectMap() *CollectMap {
return &CollectMap{entries: make(map[string]mapEntry)}
}

var _ Function = (*Collect)(nil)
Expand All @@ -22,7 +22,7 @@ type mapEntry struct {
val *zed.Value
}

func (m *Map) Consume(val *zed.Value) {
func (m *CollectMap) Consume(val *zed.Value) {
if val.IsNull() {
return
}
Expand All @@ -43,11 +43,11 @@ func (m *Map) Consume(val *zed.Value) {
}
}

func (m *Map) ConsumeAsPartial(val *zed.Value) {
func (m *CollectMap) ConsumeAsPartial(val *zed.Value) {
m.Consume(val)
}

func (m *Map) Result(zctx *zed.Context) *zed.Value {
func (m *CollectMap) Result(zctx *zed.Context) *zed.Value {
if len(m.entries) == 0 {
return zed.Null
}
Expand All @@ -71,7 +71,7 @@ func (m *Map) Result(zctx *zed.Context) *zed.Value {
return zed.NewValue(typ, b)
}

func (m *Map) ResultAsPartial(zctx *zed.Context) *zed.Value {
func (m *CollectMap) ResultAsPartial(zctx *zed.Context) *zed.Value {
return m.Result(zctx)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{k:v}|)"
zed: "collect_map(|{k:v}|)"

output-flags: -pretty 2

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{stock: price}|)"
zed: "collect_map(|{stock: price}|)"

output-flags: -pretty 2

Expand Down
71 changes: 71 additions & 0 deletions runtime/expr/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/zcode"
)

type mapFunc struct {
builder zcode.Builder
eval Evaluator
inner Evaluator
zctx *zed.Context

// vals is used to reduce allocations
vals []zed.Value
// types is used to reduce allocations
types []zed.Type
}

func NewMapFunc(zctx *zed.Context, e, inner Evaluator) Evaluator {
return &mapFunc{eval: e, inner: inner, zctx: zctx}
}

func (a *mapFunc) Eval(ectx Context, in *zed.Value) *zed.Value {
v := a.eval.Eval(ectx, in)
if v.IsError() {
return v
}
elems, err := v.Elements()
if err != nil {
return ectx.CopyValue(*a.zctx.WrapError(err.Error(), in))
}
if len(elems) == 0 {
return v
}
a.vals = a.vals[:0]
a.types = a.types[:0]
for _, elem := range elems {
out := a.inner.Eval(ectx, &elem)
a.vals = append(a.vals, *out)
a.types = append(a.types, out.Type)
}
inner := a.innerType(a.types)
bytes := a.buildVal(inner, a.vals)
if _, ok := zed.TypeUnder(in.Type).(*zed.TypeSet); ok {
return ectx.NewValue(a.zctx.LookupTypeSet(inner), zed.NormalizeSet(bytes))
}
return ectx.NewValue(a.zctx.LookupTypeArray(inner), bytes)
}

func (a *mapFunc) buildVal(inner zed.Type, vals []zed.Value) []byte {
a.builder.Reset()
if union, ok := inner.(*zed.TypeUnion); ok {
for _, val := range a.vals {
zed.BuildUnion(&a.builder, union.TagOf(val.Type), val.Bytes())
}
} else {
for _, val := range a.vals {
a.builder.Append(val.Bytes())
}
}
return a.builder.Bytes()
}

func (a *mapFunc) innerType(types []zed.Type) zed.Type {
types = zed.UniqueTypes(types)
if len(types) == 1 {
return types[0]
}
return a.zctx.LookupTypeUnion(types)
}
8 changes: 8 additions & 0 deletions runtime/expr/ztests/map.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
script: |
echo '{a:["foo","bar","baz"]}' | zq -z 'a := map(a,upper)' -
echo '["1","2","3"]' | zq -z 'yield map(this,int64)' -
outputs:
- name: stdout
data: |
{a:["FOO","BAR","BAZ"]}
[1,2,3]

0 comments on commit c5a2511

Please sign in to comment.