diff --git a/compiler/describe.go b/compiler/describe.go new file mode 100644 index 0000000000..816f1b1b64 --- /dev/null +++ b/compiler/describe.go @@ -0,0 +1,26 @@ +package compiler + +import ( + "context" + "errors" + + "github.com/brimdata/zed/compiler/data" + "github.com/brimdata/zed/compiler/describe" + "github.com/brimdata/zed/compiler/semantic" + "github.com/brimdata/zed/lakeparse" +) + +func Describe(ctx context.Context, query string, src *data.Source, head *lakeparse.Commitish) (*describe.Info, error) { + seq, err := Parse(query) + if err != nil { + return nil, err + } + if len(seq) == 0 { + return nil, errors.New("internal error: AST seq cannot be empty") + } + entry, err := semantic.AnalyzeAddSource(ctx, seq, src, head) + if err != nil { + return nil, err + } + return describe.Analyze(ctx, src, entry) +} diff --git a/compiler/describe/analyze.go b/compiler/describe/analyze.go new file mode 100644 index 0000000000..360ba92a53 --- /dev/null +++ b/compiler/describe/analyze.go @@ -0,0 +1,160 @@ +package describe + +import ( + "context" + "errors" + "fmt" + + "github.com/brimdata/zed/compiler/ast/dag" + "github.com/brimdata/zed/compiler/data" + "github.com/brimdata/zed/compiler/optimizer" + "github.com/brimdata/zed/lake" + "github.com/brimdata/zed/order" + "github.com/brimdata/zed/pkg/field" + "github.com/segmentio/ksuid" +) + +type Info struct { + Sources []Source `json:"sources"` + Channels []Channel `json:"channels"` +} + +type Source interface { + Source() +} + +type ( + LakeMeta struct { + Kind string `json:"kind"` + Meta string `json:"meta"` + } + Pool struct { + Kind string `json:"kind"` + Name string `json:"name"` + ID ksuid.KSUID `json:"id"` + } + Path struct { + Kind string `json:"kind"` + URI string `json:"uri"` + } +) + +func (*LakeMeta) Source() {} +func (*Pool) Source() {} +func (*Path) Source() {} + +type Channel struct { + AggregationKeys field.List `json:"aggregation_keys"` + Sort *order.SortKey `json:"sort"` +} + +func Analyze(ctx context.Context, source *data.Source, seq dag.Seq) (*Info, error) { + var info Info + var err error + if info.Sources, err = describeSources(ctx, source.Lake(), seq[0]); err != nil { + return nil, err + } + sortKeys, err := optimizer.New(ctx, source).SortKeys(seq) + if err != nil { + return nil, err + } + aggKeys := describeAggs(seq, []field.List{nil}) + for i := range sortKeys { + // Convert SortKey to a pointer so a nil sort is encoded as null for + // JSON/ZSON. + var s *order.SortKey + if !sortKeys[i].IsNil() { + s = &sortKeys[i] + } + info.Channels = append(info.Channels, Channel{ + Sort: s, + AggregationKeys: aggKeys[i], + }) + } + return &info, nil +} + +func describeSources(ctx context.Context, lk *lake.Root, o dag.Op) ([]Source, error) { + switch o := o.(type) { + case *dag.Fork: + var s []Source + for _, p := range o.Paths { + out, err := describeSources(ctx, lk, p[0]) + if err != nil { + return nil, err + } + s = append(s, out...) + } + return s, nil + case *dag.DefaultScan: + return []Source{&Path{Kind: "Path", URI: "stdio://stdin"}}, nil + case *dag.FileScan: + return []Source{&Path{Kind: "Path", URI: o.Path}}, nil + case *dag.HTTPScan: + return []Source{&Path{Kind: "Path", URI: o.URL}}, nil + case *dag.PoolScan: + return sourceOfPool(ctx, lk, o.ID) + case *dag.Lister: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.SeqScan: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.CommitMetaScan: + return sourceOfPool(ctx, lk, o.Pool) + case *dag.LakeMetaScan: + return []Source{&LakeMeta{Kind: "LakeMeta", Meta: o.Meta}}, nil + default: + return nil, fmt.Errorf("unsupported source type %T", o) + } +} + +func sourceOfPool(ctx context.Context, lk *lake.Root, id ksuid.KSUID) ([]Source, error) { + if lk == nil { + panic(errors.New("internal error: lake operation cannot be used in non-lake context")) + } + p, err := lk.OpenPool(ctx, id) + if err != nil { + return nil, err + } + return []Source{&Pool{ + Kind: "Pool", + ID: id, + Name: p.Name, + }}, nil +} + +func describeAggs(seq dag.Seq, parents []field.List) []field.List { + for _, op := range seq { + parents = describeOpAggs(op, parents) + } + return parents +} + +func describeOpAggs(op dag.Op, parents []field.List) []field.List { + switch op := op.(type) { + case *dag.Fork: + var aggs []field.List + for _, p := range op.Paths { + aggs = append(aggs, describeAggs(p, []field.List{nil})...) + } + return aggs + case *dag.Scatter: + var aggs []field.List + for _, p := range op.Paths { + aggs = append(aggs, describeAggs(p, []field.List{nil})...) + } + return aggs + case *dag.Summarize: + // The field list for aggregation with no keys is an empty slice and + // not nil. + keys := field.List{} + for _, k := range op.Keys { + keys = append(keys, k.LHS.(*dag.This).Path) + } + return []field.List{keys} + } + // If more than one parent reset to nil aggregation. + if len(parents) > 1 { + return []field.List{nil} + } + return parents +} diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index 5f6c90389c..f398e94341 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -257,6 +257,10 @@ func (o *Optimizer) optimizeSourcePaths(seq dag.Seq) (dag.Seq, error) { }) } +func (o *Optimizer) SortKeys(seq dag.Seq) ([]order.SortKey, error) { + return o.propagateSortKey(copyOps(seq), []order.SortKey{order.Nil}) +} + // propagateSortKey analyzes a Seq and attempts to push the scan order of the data source // into the first downstream aggregation. (We could continue the analysis past that // point but don't bother yet because we do not yet support any optimization @@ -330,7 +334,7 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or // We'll live this as unknown for now even though the groupby // and not try to optimize downstream of the first groupby // unless there is an excplicit sort encountered. - return nil, nil + return []order.SortKey{order.Nil}, nil case *dag.Fork: var keys []order.SortKey for _, seq := range op.Paths { diff --git a/service/core.go b/service/core.go index 84faf0a0eb..5eb886d602 100644 --- a/service/core.go +++ b/service/core.go @@ -187,6 +187,7 @@ func (c *Core) addAPIServerRoutes() { c.authhandle("/pool/{pool}/revision/{revision}/vector", handleVectorDelete).Methods("DELETE") c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET") c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST") + c.authhandle("/query/describe", handleQueryDescribe).Methods("OPTIONS", "POST") c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET") } diff --git a/service/handlers.go b/service/handlers.go index 42995e7030..a412c2c20f 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -12,12 +12,14 @@ import ( "github.com/brimdata/zed/api/queryio" "github.com/brimdata/zed/compiler" "github.com/brimdata/zed/compiler/ast" + "github.com/brimdata/zed/compiler/data" "github.com/brimdata/zed/compiler/optimizer/demand" "github.com/brimdata/zed/lake" lakeapi "github.com/brimdata/zed/lake/api" "github.com/brimdata/zed/lake/commits" "github.com/brimdata/zed/lake/journal" "github.com/brimdata/zed/lakeparse" + "github.com/brimdata/zed/pkg/storage" "github.com/brimdata/zed/runtime" "github.com/brimdata/zed/runtime/exec" "github.com/brimdata/zed/runtime/sam/op" @@ -170,6 +172,20 @@ func handleCompile(c *Core, w *ResponseWriter, r *Request) { w.Respond(http.StatusOK, ast) } +func handleQueryDescribe(c *Core, w *ResponseWriter, r *Request) { + var req api.QueryRequest + if !r.Unmarshal(w, &req) { + return + } + src := data.NewSource(storage.NewRemoteEngine(), c.root) + info, err := compiler.Describe(r.Context(), req.Query, src, &req.Head) + if err != nil { + w.Error(srverr.ErrInvalid(err)) + return + } + w.Respond(http.StatusOK, info) +} + func handleBranchGet(c *Core, w *ResponseWriter, r *Request) { branchName, ok := r.StringFromPath(w, "branch") if !ok { diff --git a/service/ztests/query-describe.yaml b/service/ztests/query-describe.yaml new file mode 100644 index 0000000000..aae3638eee --- /dev/null +++ b/service/ztests/query-describe.yaml @@ -0,0 +1,94 @@ +script: | + source service.sh + zed create -q test1 + zed create -q test2 + for file in multifrom.zed agg.zed agg-no-keys.zed; do + echo // === $file === + query="$(cat $file | jq -Rsa .)" + curl -H "Accept: application/json" -d "{\"query\":$query,\"head\":{\"pool\":\"test1\"}}" $ZED_LAKE/query/describe | + zq -J 'sources := (over sources | id := "XXX")' - + done + + +inputs: + - name: service.sh + - name: multifrom.zed + data: | + from ( + pool test1 + pool test2 + ) | put foo := "bar" + - name: agg.zed + data: | + count() by key1:=v1, key2 + - name: agg-no-keys.zed + data: | + sum(this) + +outputs: + - name: stdout + data: | + // === multifrom.zed === + { + "sources": [ + { + "kind": "Pool", + "name": "test1", + "id": "XXX" + }, + { + "kind": "Pool", + "name": "test2", + "id": "XXX" + } + ], + "channels": [ + { + "aggregation_keys": null, + "sort": { + "order": "desc", + "keys": [ + [ + "ts" + ] + ] + } + } + ] + } + // === agg.zed === + { + "sources": { + "kind": "Pool", + "name": "test1", + "id": "XXX" + }, + "channels": [ + { + "aggregation_keys": [ + [ + "key1" + ], + [ + "key2" + ] + ], + "sort": null + } + ] + } + // === agg-no-keys.zed === + { + "sources": { + "kind": "Pool", + "name": "test1", + "id": "XXX" + }, + "channels": [ + { + "aggregation_keys": [ + ], + "sort": null + } + ] + }