Skip to content

Commit

Permalink
Add map function
Browse files Browse the repository at this point in the history
The map function that applies a function to every element in an array or set
value.

Also: rename the map aggregator to collect_map to avoid collision with
this new function.

Closes #4610
  • Loading branch information
mattnibs committed Nov 1, 2023
1 parent 23e411d commit f0f91cf
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
* Add [`regexp()`](docs/language/functions/regexp.md) function for regular expression searches and capture groups (#4145, #4158)
* Add [`coalesce()`](docs/language/functions/coalesce.md) function for locating non-null/non-error values (#4172)
* Add `line` format for sourcing newline-delimited input as strings (#4175)
* Add [`map()` aggregation function](docs/language/aggregates/map.md) for constructing [maps](docs/formats/zed.md#24-map) #4173
* Add [`collect_map()` aggregation function](docs/language/aggregates/collect_map.md) for constructing [maps](docs/formats/zed.md#24-map) #4173

## v1.2.0
* Compress index values (#3974)
Expand Down
6 changes: 6 additions & 0 deletions compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type (
Kind string `json:"kind" unpack:""`
Value string `json:"value"`
}
MapCall struct {
Kind string `json:"kind" unpack:""`
Expr Expr `json:"expr"`
Inner Expr `json:"inner"`
}
MapExpr struct {
Kind string `json:"kind" unpack:""`
Entries []Entry `json:"entries"`
Expand Down Expand Up @@ -121,6 +126,7 @@ func (*Conditional) ExprDAG() {}
func (*Dot) ExprDAG() {}
func (*Func) ExprDAG() {}
func (*Literal) ExprDAG() {}
func (*MapCall) ExprDAG() {}
func (*MapExpr) ExprDAG() {}
func (*OverExpr) ExprDAG() {}
func (*RecordExpr) ExprDAG() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var unpacker = unpack.New(
Lister{},
Literal{},
Load{},
MapCall{},
MapExpr{},
Merge{},
Over{},
Expand Down
14 changes: 14 additions & 0 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (b *Builder) compileExpr(e dag.Expr) (expr.Evaluator, error) {
return b.compileArrayExpr(e)
case *dag.SetExpr:
return b.compileSetExpr(e)
case *dag.MapCall:
return b.compileMapCall(e)
case *dag.MapExpr:
return b.compileMapExpr(e)
case *dag.Agg:
Expand Down Expand Up @@ -309,6 +311,18 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) {
return expr.NewCall(b.zctx(), fn, exprs), nil
}

func (b *Builder) compileMapCall(a *dag.MapCall) (expr.Evaluator, error) {
e, err := b.compileExpr(a.Expr)
if err != nil {
return nil, err
}
inner, err := b.compileExpr(a.Inner)
if err != nil {
return nil, err
}
return expr.NewMapCall(b.zctx(), e, inner), nil
}

func (b *Builder) compileShaper(node dag.Call, tf expr.ShaperTransform) (expr.Evaluator, error) {
args := node.Args
field, err := b.compileExpr(args[0])
Expand Down
21 changes: 21 additions & 0 deletions compiler/semantic/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,27 @@ func (a *analyzer) semCall(call *ast.Call) (dag.Expr, error) {
if nargs == 1 {
exprs = append([]dag.Expr{&dag.This{Kind: "This"}}, exprs...)
}
case name == "map":
if err := function.CheckArgCount(nargs, 2, 2); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
}
id, ok := call.Args[1].(*ast.ID)
if !ok {
return nil, fmt.Errorf("%s(): second argument must be the identifier of a function", name)
}
inner, err := a.semCall(&ast.Call{
Kind: "Call",
Name: id.Name,
Args: []ast.Expr{&ast.ID{Kind: "ID", Name: "this"}},
})
if err != nil {
return nil, err
}
return &dag.MapCall{
Kind: "MapCall",
Expr: exprs[0],
Inner: inner,
}, nil
default:
if _, _, err = function.New(a.zctx, name, nargs); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
Expand Down
2 changes: 1 addition & 1 deletion docs/language/aggregates/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ value for a sequence of input values.
- [any](any.md) - select an arbitrary value from its input
- [avg](avg.md) - average value
- [collect](collect.md) - aggregate values into array
- [collect_map](collect_map.md) - aggregate map values into a single map
- [count](count.md) - count input values
- [dcount](dcount.md) - count distinct input values
- [fuse](fuse.md) - compute a fused type of input values
- [map](map.md) - aggregate map values into a single map
- [max](max.md) - maximum value of input values
- [min](min.md) - minimum value of input values
- [or](or.md) - logical OR of input values
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
### Aggregate Function

  **map** — aggregate map values into a single map
  **collect_map** — aggregate map values into a single map

### Synopsis
```
map(|{any:any}|) -> |{any:any}|
collect_map(|{any:any}|) -> |{any:any}|
```

### Description

The _map_ aggregate function combines map inputs into a single map output.
If _map_ receives multiple values for the same key, the last value received is
The _collect_map_ aggregate function combines map inputs into a single map output.
If _collect_map_ receives multiple values for the same key, the last value received is
retained. If the input keys or values vary in type, the return type will be a map
of union of those types.

### Examples

Combine a sequence of records into a map:
```mdtest-command
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{stock:price}|)' -
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'collect_map(|{stock:price}|)' -
```
=>
```mdtest-output
Expand All @@ -27,7 +27,7 @@ echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{sto

Continuous collection over a simple sequence:
```mdtest-command
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield map(this)' -
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield collect_map(this)' -
```
=>
```mdtest-output
Expand Down
1 change: 1 addition & 0 deletions docs/language/functions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Zed's [primitive types](../../formats/zed.md#1-primitive-types), e.g.,
* [levenshtein](levenshtein.md) Levenshtein distance
* [log](log.md) - natural logarithm
* [lower](lower.md) - convert a string to lower case
* [map](map.md) - apply a function to each element of an array
* [missing](missing.md) - test for the "missing" error
* [nameof](nameof.md) - the name of a named type
* [nest_dotted](nest_dotted.md) - transform fields in a record with dotted names to nested records
Expand Down
42 changes: 42 additions & 0 deletions docs/language/functions/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
### Function

  **map** — calls a function on each element of an array and returns the results

### Synopsis

```
map(v: array|set, f: function) -> array|set
```

### Description

The _map_ function applies function f to every element in array/set v and
returns the augmented array/set. Function f must be a function that takes
only one argument. A user defined function can be used for f.

### Examples

Upper case each element of an array:

```mdtest-command
echo '["foo","bar","baz"]' | zq -z 'yield map(this, upper)' -
```
=>
```mdtest-output
["FOO","BAR","BAZ"]
```

Using a user defined function to convert an epoch float to a time:

```mdtest-input udf.zed
func floatToTime(x): ( cast(x*1000000000, <time>) )
yield map(this, floatToTime)
```

```mdtest-command
echo '[1697151533.41415,1697151540.716529]' | zq -z -I udf.zed -
```
=>
```mdtest-output
[2023-10-12T22:58:53.414149888Z,2023-10-12T22:59:00.716528896Z]
```
4 changes: 2 additions & 2 deletions runtime/expr/agg/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func NewPattern(op string, hasarg bool) (Pattern, error) {
pattern = func() Function {
return newMathReducer(anymath.Add)
}
case "map":
case "collect_map":
pattern = func() Function {
return newMap()
return newCollectMap()
}
case "min":
pattern = func() Function {
Expand Down
30 changes: 15 additions & 15 deletions runtime/expr/agg/map.go → runtime/expr/agg/collectmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"golang.org/x/exp/slices"
)

type Map struct {
type CollectMap struct {
entries map[string]mapEntry
scratch []byte
}

func newMap() *Map {
return &Map{entries: make(map[string]mapEntry)}
func newCollectMap() *CollectMap {
return &CollectMap{entries: make(map[string]mapEntry)}
}

var _ Function = (*Collect)(nil)
Expand All @@ -22,7 +22,7 @@ type mapEntry struct {
val *zed.Value
}

func (m *Map) Consume(val *zed.Value) {
func (c *CollectMap) Consume(val *zed.Value) {
if val.IsNull() {
return
}
Expand All @@ -36,23 +36,23 @@ func (m *Map) Consume(val *zed.Value) {
keyTagAndBody := it.NextTagAndBody()
key := valueUnder(mtyp.KeyType, keyTagAndBody.Body())
val := valueUnder(mtyp.ValType, it.Next())
m.scratch = zed.AppendTypeValue(m.scratch[:0], key.Type)
m.scratch = append(m.scratch, keyTagAndBody...)
c.scratch = zed.AppendTypeValue(c.scratch[:0], key.Type)
c.scratch = append(c.scratch, keyTagAndBody...)
// This will squash existing values which is what we want.
m.entries[string(m.scratch)] = mapEntry{key, val}
c.entries[string(c.scratch)] = mapEntry{key, val}
}
}

func (m *Map) ConsumeAsPartial(val *zed.Value) {
m.Consume(val)
func (c *CollectMap) ConsumeAsPartial(val *zed.Value) {
c.Consume(val)
}

func (m *Map) Result(zctx *zed.Context) *zed.Value {
if len(m.entries) == 0 {
func (c *CollectMap) Result(zctx *zed.Context) *zed.Value {
if len(c.entries) == 0 {
return zed.Null
}
var ktypes, vtypes []zed.Type
for _, e := range m.entries {
for _, e := range c.entries {
ktypes = append(ktypes, e.key.Type)
vtypes = append(vtypes, e.val.Type)
}
Expand All @@ -62,7 +62,7 @@ func (m *Map) Result(zctx *zed.Context) *zed.Value {
ktyp, kuniq := unionOf(zctx, ktypes)
vtyp, vuniq := unionOf(zctx, vtypes)
var builder zcode.Builder
for _, e := range m.entries {
for _, e := range c.entries {
appendMapVal(&builder, ktyp, e.key, kuniq)
appendMapVal(&builder, vtyp, e.val, vuniq)
}
Expand All @@ -71,8 +71,8 @@ func (m *Map) Result(zctx *zed.Context) *zed.Value {
return zed.NewValue(typ, b)
}

func (m *Map) ResultAsPartial(zctx *zed.Context) *zed.Value {
return m.Result(zctx)
func (c *CollectMap) ResultAsPartial(zctx *zed.Context) *zed.Value {
return c.Result(zctx)
}

func appendMapVal(b *zcode.Builder, typ zed.Type, val *zed.Value, uniq int) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{k:v}|)"
zed: "collect_map(|{k:v}|)"

output-flags: -pretty 2

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{stock: price}|)"
zed: "collect_map(|{stock: price}|)"

output-flags: -pretty 2

Expand Down
71 changes: 71 additions & 0 deletions runtime/expr/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/zcode"
)

type mapCall struct {
builder zcode.Builder
eval Evaluator
inner Evaluator
zctx *zed.Context

// vals is used to reduce allocations
vals []zed.Value
// types is used to reduce allocations
types []zed.Type
}

func NewMapCall(zctx *zed.Context, e, inner Evaluator) Evaluator {
return &mapCall{eval: e, inner: inner, zctx: zctx}
}

func (a *mapCall) Eval(ectx Context, in *zed.Value) *zed.Value {
v := a.eval.Eval(ectx, in)
if v.IsError() {
return v
}
elems, err := v.Elements()
if err != nil {
return ectx.CopyValue(*a.zctx.WrapError(err.Error(), in))
}
if len(elems) == 0 {
return v
}
a.vals = a.vals[:0]
a.types = a.types[:0]
for _, elem := range elems {
out := a.inner.Eval(ectx, &elem)
a.vals = append(a.vals, *out)
a.types = append(a.types, out.Type)
}
inner := a.innerType(a.types)
bytes := a.buildVal(inner, a.vals)
if _, ok := zed.TypeUnder(in.Type).(*zed.TypeSet); ok {
return ectx.NewValue(a.zctx.LookupTypeSet(inner), zed.NormalizeSet(bytes))
}
return ectx.NewValue(a.zctx.LookupTypeArray(inner), bytes)
}

func (a *mapCall) buildVal(inner zed.Type, vals []zed.Value) []byte {
a.builder.Reset()
if union, ok := inner.(*zed.TypeUnion); ok {
for _, val := range a.vals {
zed.BuildUnion(&a.builder, union.TagOf(val.Type), val.Bytes())
}
} else {
for _, val := range a.vals {
a.builder.Append(val.Bytes())
}
}
return a.builder.Bytes()
}

func (a *mapCall) innerType(types []zed.Type) zed.Type {
types = zed.UniqueTypes(types)
if len(types) == 1 {
return types[0]
}
return a.zctx.LookupTypeUnion(types)
}
17 changes: 17 additions & 0 deletions runtime/expr/ztests/map.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
script: |
echo '{a:["foo","bar","baz"]}' | zq -z 'a := map(a,upper)' -
echo '["1","2","3"]' | zq -z 'yield map(this,int64)' -
echo '[1,2,3]' | zq -z -I udf.zed -
inputs:
- name: udf.zed
data: |
func stringify(x): ( cast(x, <string>) )
yield map(this, stringify)
outputs:
- name: stdout
data: |
{a:["FOO","BAR","BAZ"]}
[1,2,3]
["1","2","3"]

0 comments on commit f0f91cf

Please sign in to comment.