Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map function #4806

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
* Add [`regexp()`](docs/language/functions/regexp.md) function for regular expression searches and capture groups (#4145, #4158)
* Add [`coalesce()`](docs/language/functions/coalesce.md) function for locating non-null/non-error values (#4172)
* Add `line` format for sourcing newline-delimited input as strings (#4175)
* Add [`map()` aggregation function](docs/language/aggregates/map.md) for constructing [maps](docs/formats/zed.md#24-map) #4173
* Add [`collect_map()` aggregation function](docs/language/aggregates/collect_map.md) for constructing [maps](docs/formats/zed.md#24-map) #4173

## v1.2.0
* Compress index values (#3974)
Expand Down
6 changes: 6 additions & 0 deletions compiler/ast/dag/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type (
Kind string `json:"kind" unpack:""`
Value string `json:"value"`
}
MapCall struct {
Kind string `json:"kind" unpack:""`
Expr Expr `json:"expr"`
Inner Expr `json:"inner"`
}
Comment on lines +65 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside to using Call rather than adding MapCall?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its kind of a different thing than a call. A call is made of args that you get the value of all by passing this into it, a map call you evaluate this, then iterate over each value an call the inner eval on each value. So I kind of think this should remain MapCall

MapExpr struct {
Kind string `json:"kind" unpack:""`
Entries []Entry `json:"entries"`
Expand Down Expand Up @@ -121,6 +126,7 @@ func (*Conditional) ExprDAG() {}
func (*Dot) ExprDAG() {}
func (*Func) ExprDAG() {}
func (*Literal) ExprDAG() {}
func (*MapCall) ExprDAG() {}
func (*MapExpr) ExprDAG() {}
func (*OverExpr) ExprDAG() {}
func (*RecordExpr) ExprDAG() {}
Expand Down
1 change: 1 addition & 0 deletions compiler/ast/dag/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var unpacker = unpack.New(
Lister{},
Literal{},
Load{},
MapCall{},
MapExpr{},
Merge{},
Over{},
Expand Down
14 changes: 14 additions & 0 deletions compiler/kernel/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (b *Builder) compileExpr(e dag.Expr) (expr.Evaluator, error) {
return b.compileArrayExpr(e)
case *dag.SetExpr:
return b.compileSetExpr(e)
case *dag.MapCall:
return b.compileMapCall(e)
case *dag.MapExpr:
return b.compileMapExpr(e)
case *dag.Agg:
Expand Down Expand Up @@ -309,6 +311,18 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) {
return expr.NewCall(b.zctx(), fn, exprs), nil
}

func (b *Builder) compileMapCall(a *dag.MapCall) (expr.Evaluator, error) {
e, err := b.compileExpr(a.Expr)
if err != nil {
return nil, err
}
inner, err := b.compileExpr(a.Inner)
if err != nil {
return nil, err
}
return expr.NewMapCall(b.zctx(), e, inner), nil
}

func (b *Builder) compileShaper(node dag.Call, tf expr.ShaperTransform) (expr.Evaluator, error) {
args := node.Args
field, err := b.compileExpr(args[0])
Expand Down
21 changes: 21 additions & 0 deletions compiler/semantic/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,27 @@ func (a *analyzer) semCall(call *ast.Call) (dag.Expr, error) {
if nargs == 1 {
exprs = append([]dag.Expr{&dag.This{Kind: "This"}}, exprs...)
}
case name == "map":
if err := function.CheckArgCount(nargs, 2, 2); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
}
id, ok := call.Args[1].(*ast.ID)
if !ok {
return nil, fmt.Errorf("%s(): second argument must be the identifier of a function", name)
}
inner, err := a.semCall(&ast.Call{
Kind: "Call",
Name: id.Name,
Args: []ast.Expr{&ast.ID{Kind: "ID", Name: "this"}},
})
if err != nil {
return nil, err
}
return &dag.MapCall{
Kind: "MapCall",
Expr: exprs[0],
Inner: inner,
}, nil
default:
if _, _, err = function.New(a.zctx, name, nargs); err != nil {
return nil, fmt.Errorf("%s(): %w", name, err)
Expand Down
2 changes: 1 addition & 1 deletion docs/language/aggregates/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ value for a sequence of input values.
- [any](any.md) - select an arbitrary value from its input
- [avg](avg.md) - average value
- [collect](collect.md) - aggregate values into array
- [collect_map](collect_map.md) - aggregate map values into a single map
- [count](count.md) - count input values
- [dcount](dcount.md) - count distinct input values
- [fuse](fuse.md) - compute a fused type of input values
- [map](map.md) - aggregate map values into a single map
- [max](max.md) - maximum value of input values
- [min](min.md) - minimum value of input values
- [or](or.md) - logical OR of input values
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
### Aggregate Function

  **map** — aggregate map values into a single map
  **collect_map** — aggregate map values into a single map

### Synopsis
```
map(|{any:any}|) -> |{any:any}|
collect_map(|{any:any}|) -> |{any:any}|
```

### Description

The _map_ aggregate function combines map inputs into a single map output.
If _map_ receives multiple values for the same key, the last value received is
The _collect_map_ aggregate function combines map inputs into a single map output.
If _collect_map_ receives multiple values for the same key, the last value received is
retained. If the input keys or values vary in type, the return type will be a map
of union of those types.

### Examples

Combine a sequence of records into a map:
```mdtest-command
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{stock:price}|)' -
echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'collect_map(|{stock:price}|)' -
```
=>
```mdtest-output
Expand All @@ -27,7 +27,7 @@ echo '{stock:"APPL",price:145.03} {stock:"GOOG",price:87.07}' | zq -z 'map(|{sto

Continuous collection over a simple sequence:
```mdtest-command
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield map(this)' -
echo '|{"APPL":145.03}| |{"GOOG":87.07}| |{"APPL":150.13}|' | zq -z 'yield collect_map(this)' -
```
=>
```mdtest-output
Expand Down
1 change: 1 addition & 0 deletions docs/language/functions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Zed's [primitive types](../../formats/zed.md#1-primitive-types), e.g.,
* [levenshtein](levenshtein.md) Levenshtein distance
* [log](log.md) - natural logarithm
* [lower](lower.md) - convert a string to lower case
* [map](map.md) - apply a function to each element of an array
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
* [missing](missing.md) - test for the "missing" error
* [nameof](nameof.md) - the name of a named type
* [nest_dotted](nest_dotted.md) - transform fields in a record with dotted names to nested records
Expand Down
42 changes: 42 additions & 0 deletions docs/language/functions/map.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
### Function

  **map** — calls a function on each element of an array and returns the results
mattnibs marked this conversation as resolved.
Show resolved Hide resolved

### Synopsis

```
map(v: array|set, f: function) -> array|set
nwt marked this conversation as resolved.
Show resolved Hide resolved
```

### Description

The _map_ function applies function f to every element in array/set v and
returns the augmented array/set. Function f must be a function that takes
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
only one argument. A user defined function can be used for f.

### Examples

Upper case each element of an array:

```mdtest-command
echo '["foo","bar","baz"]' | zq -z 'yield map(this, upper)' -
```
=>
```mdtest-output
["FOO","BAR","BAZ"]
```

Using a user defined function to convert an epoch float to a time:

```mdtest-input udf.zed
func floatToTime(x): ( cast(x*1000000000, <time>) )
yield map(this, floatToTime)
```

```mdtest-command
echo '[1697151533.41415,1697151540.716529]' | zq -z -I udf.zed -
```
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
=>
```mdtest-output
[2023-10-12T22:58:53.414149888Z,2023-10-12T22:59:00.716528896Z]
```
4 changes: 2 additions & 2 deletions runtime/expr/agg/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func NewPattern(op string, hasarg bool) (Pattern, error) {
pattern = func() Function {
return newMathReducer(anymath.Add)
}
case "map":
case "collect_map":
pattern = func() Function {
return newMap()
return newCollectMap()
}
case "min":
pattern = func() Function {
Expand Down
30 changes: 15 additions & 15 deletions runtime/expr/agg/map.go → runtime/expr/agg/collectmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"golang.org/x/exp/slices"
)

type Map struct {
type CollectMap struct {
entries map[string]mapEntry
scratch []byte
}

func newMap() *Map {
return &Map{entries: make(map[string]mapEntry)}
func newCollectMap() *CollectMap {
return &CollectMap{entries: make(map[string]mapEntry)}
}

var _ Function = (*Collect)(nil)
Expand All @@ -22,7 +22,7 @@ type mapEntry struct {
val *zed.Value
}

func (m *Map) Consume(val *zed.Value) {
func (c *CollectMap) Consume(val *zed.Value) {
if val.IsNull() {
return
}
Expand All @@ -36,23 +36,23 @@ func (m *Map) Consume(val *zed.Value) {
keyTagAndBody := it.NextTagAndBody()
key := valueUnder(mtyp.KeyType, keyTagAndBody.Body())
val := valueUnder(mtyp.ValType, it.Next())
m.scratch = zed.AppendTypeValue(m.scratch[:0], key.Type)
m.scratch = append(m.scratch, keyTagAndBody...)
c.scratch = zed.AppendTypeValue(c.scratch[:0], key.Type)
c.scratch = append(c.scratch, keyTagAndBody...)
// This will squash existing values which is what we want.
m.entries[string(m.scratch)] = mapEntry{key, val}
c.entries[string(c.scratch)] = mapEntry{key, val}
}
}

func (m *Map) ConsumeAsPartial(val *zed.Value) {
m.Consume(val)
func (c *CollectMap) ConsumeAsPartial(val *zed.Value) {
c.Consume(val)
}

func (m *Map) Result(zctx *zed.Context) *zed.Value {
if len(m.entries) == 0 {
func (c *CollectMap) Result(zctx *zed.Context) *zed.Value {
if len(c.entries) == 0 {
return zed.Null
}
var ktypes, vtypes []zed.Type
for _, e := range m.entries {
for _, e := range c.entries {
ktypes = append(ktypes, e.key.Type)
vtypes = append(vtypes, e.val.Type)
}
Expand All @@ -62,7 +62,7 @@ func (m *Map) Result(zctx *zed.Context) *zed.Value {
ktyp, kuniq := unionOf(zctx, ktypes)
vtyp, vuniq := unionOf(zctx, vtypes)
var builder zcode.Builder
for _, e := range m.entries {
for _, e := range c.entries {
appendMapVal(&builder, ktyp, e.key, kuniq)
appendMapVal(&builder, vtyp, e.val, vuniq)
}
Expand All @@ -71,8 +71,8 @@ func (m *Map) Result(zctx *zed.Context) *zed.Value {
return zed.NewValue(typ, b)
}

func (m *Map) ResultAsPartial(zctx *zed.Context) *zed.Value {
return m.Result(zctx)
func (c *CollectMap) ResultAsPartial(zctx *zed.Context) *zed.Value {
return c.Result(zctx)
}

func appendMapVal(b *zcode.Builder, typ zed.Type, val *zed.Value, uniq int) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{k:v}|)"
zed: "collect_map(|{k:v}|)"

output-flags: -pretty 2

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
zed: "map(|{stock: price}|)"
zed: "collect_map(|{stock: price}|)"

output-flags: -pretty 2

Expand Down
71 changes: 71 additions & 0 deletions runtime/expr/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package expr

import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/zcode"
)

type mapCall struct {
builder zcode.Builder
eval Evaluator
inner Evaluator
zctx *zed.Context

// vals is used to reduce allocations
vals []zed.Value
// types is used to reduce allocations
types []zed.Type
}

func NewMapCall(zctx *zed.Context, e, inner Evaluator) Evaluator {
return &mapCall{eval: e, inner: inner, zctx: zctx}
}

func (a *mapCall) Eval(ectx Context, in *zed.Value) *zed.Value {
v := a.eval.Eval(ectx, in)
if v.IsError() {
return v
}
elems, err := v.Elements()
if err != nil {
return ectx.CopyValue(*a.zctx.WrapError(err.Error(), in))
}
if len(elems) == 0 {
return v
}
a.vals = a.vals[:0]
a.types = a.types[:0]
for _, elem := range elems {
out := a.inner.Eval(ectx, &elem)
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
a.vals = append(a.vals, *out)
a.types = append(a.types, out.Type)
}
inner := a.innerType(a.types)
bytes := a.buildVal(inner, a.vals)
if _, ok := zed.TypeUnder(in.Type).(*zed.TypeSet); ok {
return ectx.NewValue(a.zctx.LookupTypeSet(inner), zed.NormalizeSet(bytes))
}
return ectx.NewValue(a.zctx.LookupTypeArray(inner), bytes)
}

func (a *mapCall) buildVal(inner zed.Type, vals []zed.Value) []byte {
a.builder.Reset()
if union, ok := inner.(*zed.TypeUnion); ok {
for _, val := range a.vals {
zed.BuildUnion(&a.builder, union.TagOf(val.Type), val.Bytes())
}
} else {
for _, val := range a.vals {
a.builder.Append(val.Bytes())
}
}
return a.builder.Bytes()
}

func (a *mapCall) innerType(types []zed.Type) zed.Type {
types = zed.UniqueTypes(types)
if len(types) == 1 {
return types[0]
}
return a.zctx.LookupTypeUnion(types)
}
17 changes: 17 additions & 0 deletions runtime/expr/ztests/map.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
script: |
echo '{a:["foo","bar","baz"]}' | zq -z 'a := map(a,upper)' -
echo '["1","2","3"]' | zq -z 'yield map(this,int64)' -
echo '[1,2,3]' | zq -z -I udf.zed -

inputs:
- name: udf.zed
data: |
func stringify(x): ( cast(x, <string>) )
yield map(this, stringify)

outputs:
- name: stdout
data: |
{a:["FOO","BAR","BAZ"]}
[1,2,3]
["1","2","3"]