Skip to content

Commit

Permalink
dev vector query: support concatenated csup files (#5526)
Browse files Browse the repository at this point in the history
dev vector query: fix CSUP reader
  • Loading branch information
mattnibs authored Dec 13, 2024
1 parent 883ffd2 commit a32651f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
18 changes: 8 additions & 10 deletions cmd/super/dev/vector/query/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package query
import (
"errors"
"flag"
"os"

"github.com/brimdata/super"
"github.com/brimdata/super/cli/outputflags"
Expand All @@ -11,10 +12,9 @@ import (
"github.com/brimdata/super/pkg/charm"
"github.com/brimdata/super/pkg/storage"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/vcache"
"github.com/brimdata/super/zbuf"
"github.com/brimdata/super/zio"
"github.com/segmentio/ksuid"
"github.com/brimdata/super/zio/vngio"
)

var spec = &charm.Spec{
Expand Down Expand Up @@ -57,23 +57,21 @@ func (c *Command) Run(args []string) error {
return errors.New("usage: query followed by a single path argument of VNG data")
}
text := args[0]
uri, err := storage.ParseURI(args[1])
f, err := os.Open(args[1])
if err != nil {
return err
}
local := storage.NewLocalEngine()
cache := vcache.NewCache(local)
object, err := cache.Fetch(ctx, uri, ksuid.Nil)
rctx := runtime.NewContext(ctx, super.NewContext())
r, err := vngio.NewVectorReader(ctx, rctx.Zctx, f, nil)
if err != nil {
return err
}
defer object.Close()
rctx := runtime.NewContext(ctx, super.NewContext())
puller, err := compiler.VectorCompile(rctx, text, object)
defer r.Pull(true)
puller, err := compiler.VectorCompile(rctx, text, r)
if err != nil {
return err
}
writer, err := c.outputFlags.Open(ctx, local)
writer, err := c.outputFlags.Open(ctx, storage.NewLocalEngine())
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions compiler/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/brimdata/super/runtime/sam/op"
"github.com/brimdata/super/runtime/vam"
vamop "github.com/brimdata/super/runtime/vam/op"
"github.com/brimdata/super/runtime/vcache"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/zbuf"
"github.com/brimdata/super/zio"
)
Expand Down Expand Up @@ -118,7 +118,7 @@ func bundleOutputs(rctx *runtime.Context, outputs map[string]zbuf.Puller) zbuf.P
// where the entire query is vectorizable. It does not call optimize
// nor does it compute the demand of the query to prune the projection
// from the vcache.
func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (zbuf.Puller, error) {
func VectorCompile(rctx *runtime.Context, query string, puller vector.Puller) (zbuf.Puller, error) {
ast, err := parser.ParseQuery(query)
if err != nil {
return nil, err
Expand All @@ -132,7 +132,6 @@ func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (
panic("DAG assumptions violated")
}
entry = entry[1:]
puller := vam.NewVectorProjection(rctx.Zctx, object, nil) //XXX project all
builder := kernel.NewBuilder(rctx, env)
outputs, err := builder.BuildWithPuller(entry, puller)
if err != nil {
Expand Down
18 changes: 8 additions & 10 deletions ztest/ztest.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ import (
"github.com/brimdata/super/compiler"
"github.com/brimdata/super/compiler/parser"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/vcache"
"github.com/brimdata/super/vector"
"github.com/brimdata/super/vng"
"github.com/brimdata/super/zbuf"
"github.com/brimdata/super/zio"
"github.com/brimdata/super/zio/anyio"
"github.com/brimdata/super/zio/vngio"
"github.com/brimdata/super/zio/zsonio"
"github.com/pmezard/go-difflib/difflib"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -554,12 +555,12 @@ func runvec(zedProgram string, input string, outputFlags []string) (string, erro
if err := flags.Parse(outputFlags); err != nil {
return "", err
}
object, err := createVCacheObject(input)
r, err := createVPuller(input)
if err != nil {
return "", err
}
defer object.Close()
puller, err := compiler.VectorCompile(runtime.DefaultContext(), zedProgram, object)
defer r.Pull(true)
puller, err := compiler.VectorCompile(runtime.DefaultContext(), zedProgram, r)
if err != nil {
return "", err
}
Expand All @@ -575,16 +576,13 @@ func runvec(zedProgram string, input string, outputFlags []string) (string, erro
return outbuf.String(), err
}

func createVCacheObject(input string) (*vcache.Object, error) {
func createVPuller(input string) (vector.Puller, error) {
var buf bytes.Buffer
w := vng.NewWriter(zio.NopCloser(&buf))
r := zsonio.NewReader(super.NewContext(), strings.NewReader(input))
if err := errors.Join(zio.Copy(w, r), w.Close()); err != nil {
return nil, err
}
o, err := vng.NewObject(bytes.NewReader(buf.Bytes()))
if err != nil {
return nil, err
}
return vcache.NewObjectFromVNG(o), nil
rctx := runtime.DefaultContext()
return vngio.NewVectorReader(rctx.Context, rctx.Zctx, bytes.NewReader(buf.Bytes()), nil)
}

0 comments on commit a32651f

Please sign in to comment.