From b8cc2670693eca273844bab06fea1a6e48620472 Mon Sep 17 00:00:00 2001 From: iguazio-deploy Date: Mon, 27 Apr 2020 11:14:11 +0000 Subject: [PATCH 1/2] Updated TSDB to v0.10.1 --- go.mod | 6 +- go.sum | 6 ++ vendor/github.com/v3io/frames/go.mod | 4 +- vendor/github.com/v3io/frames/go.sum | 9 ++- vendor/github.com/v3io/frames/types.go | 2 + .../v3io/v3io-go/pkg/dataplane/container.go | 5 ++ .../v3io-go/pkg/dataplane/http/container.go | 12 ++++ .../v3io-go/pkg/dataplane/http/context.go | 22 ++++++ .../v3io/v3io-go/pkg/dataplane/types.go | 11 ++- .../v3io/v3io-tsdb/pkg/appender/ingest.go | 8 +-- .../v3io/v3io-tsdb/pkg/pquerier/collector.go | 72 ++++++++++--------- .../v3io/v3io-tsdb/pkg/pquerier/select.go | 70 +++++++++++++----- vendor/modules.txt | 6 +- 13 files changed, 169 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 688b3c78784..3f4c49570ee 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20180711163814-62bca832be04 github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff // indirect github.com/stretchr/testify v1.4.0 - github.com/v3io/v3io-tsdb v0.9.24 + github.com/v3io/v3io-tsdb v0.10.1 go.opencensus.io v0.19.2 // indirect golang.org/x/net v0.0.0-20190311183353-d8887717615a golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 @@ -113,6 +113,6 @@ replace labix.org/v2/mgo => github.com/go-mgo/mgo v0.0.0-20180705113738-7446a034 replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20180628173108-788fd78401277ebd861206a03c884797c6ec5541 -replace github.com/v3io/frames => github.com/v3io/frames v0.6.15-v0.9.24 +replace github.com/v3io/frames => github.com/v3io/frames v0.7.2 -replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.9.24 +replace github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.10.1 diff --git a/go.sum b/go.sum index fb635c21ea7..28881c3b455 100644 --- a/go.sum +++ b/go.sum @@ -384,6 +384,8 @@ github.com/v3io/frames v0.6.15-v0.9.23 h1:wdobAiy1s/ZEDr5c+Z7aT5KmyHuzoTQdWyionb github.com/v3io/frames v0.6.15-v0.9.23/go.mod h1:oq/QTfSSkszuqPKaywsVEPdoCwyPi903Ue77UdkCPbk= github.com/v3io/frames v0.6.15-v0.9.24 h1:BZGFuxemk3CgCv+hQQ/K+jExGCEAtjfKmfpU7+k1Ls8= github.com/v3io/frames v0.6.15-v0.9.24/go.mod h1:HO3Z241H50cQoZECDKOCISm97PKOrlV3eN7cB9ym6LA= +github.com/v3io/frames v0.7.2 h1:xf7Eli+Uv8bdYCwDFIml4cy+aIAqpLnO4Odfft9Zz/E= +github.com/v3io/frames v0.7.2/go.mod h1:YgnfrzYLRW6DWlE3MTemTwSrraUB1zhdTgyy/wwno8Q= github.com/v3io/v3io-go v0.0.0-20180415000000-1486c75b0e590a14580f7d9b6cef7a944a231ca7 h1:J+ps6exCjowNidrtawSQglJQpKrJ6v8UjBVTNrRTpMs= github.com/v3io/v3io-go v0.0.0-20180415000000-1486c75b0e590a14580f7d9b6cef7a944a231ca7/go.mod h1:MHc+d/Jg/y8lV4B9sgwTvuS3tEE9wS+kqtU0+D0Sr78= github.com/v3io/v3io-go v0.0.0-20190826150152-1f2c9a9a61cb715410a35662f5ddab2b306f95e7 h1:Qx3yIJPtDTJVv/gck/009TrBF/JMwe5RM3N9aTZ4Mlo= @@ -404,6 +406,8 @@ github.com/v3io/v3io-go v0.1.5-0.20200308131040-79c5a91d3daf h1:GdM8m1DKlA1pF4py github.com/v3io/v3io-go v0.1.5-0.20200308131040-79c5a91d3daf/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2 h1:9jIOiEqhu1bUAPl6KFiKvpFhQWkcNGVp/9OC72F+9Os= github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= +github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71 h1:yhMPIAWjePxFnhau/TydO7HhR64+skxvfB6wqDdAOOU= +github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2 h1:NJc63wM25iS+ci5z7LVwjWD4QM0QpTQw/fovKzatss0= github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog= github.com/v3io/v3io-go-http v0.0.0-20190415143924-cc2fbcde6663/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog= @@ -468,6 +472,8 @@ github.com/v3io/v3io-tsdb v0.9.23 h1:BE7AcJACWoh0OXPaXgIUiDB+bFkYWcsgL6s2xa2C60g github.com/v3io/v3io-tsdb v0.9.23/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= github.com/v3io/v3io-tsdb v0.9.24 h1:99C89uv0f8qet4RLuI3baNMUElDeaiVaaO74zbi0qHw= github.com/v3io/v3io-tsdb v0.9.24/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= +github.com/v3io/v3io-tsdb v0.10.1 h1:COY5HMHLsuhoUhczcOR9dnzzzzhhO1s4a+A9wg3OkoM= +github.com/v3io/v3io-tsdb v0.10.1/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.0.0 h1:BwIoZQbBsTo3v2F5lz5Oy3TlTq4wLKTLV260EVTEWco= diff --git a/vendor/github.com/v3io/frames/go.mod b/vendor/github.com/v3io/frames/go.mod index 26bfbb77bba..16cec20cbc1 100644 --- a/vendor/github.com/v3io/frames/go.mod +++ b/vendor/github.com/v3io/frames/go.mod @@ -16,8 +16,8 @@ require ( github.com/pkg/errors v0.8.1 github.com/russross/blackfriday v1.5.2+incompatible // indirect github.com/stretchr/testify v1.4.0 - github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2 - github.com/v3io/v3io-tsdb v0.9.24 + github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71 + github.com/v3io/v3io-tsdb v0.10.1 github.com/valyala/fasthttp v1.2.0 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a diff --git a/vendor/github.com/v3io/frames/go.sum b/vendor/github.com/v3io/frames/go.sum index 5ac8299dc6a..7e2f9b146b3 100644 --- a/vendor/github.com/v3io/frames/go.sum +++ b/vendor/github.com/v3io/frames/go.sum @@ -10,6 +10,9 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dinal/v3io-go v0.0.5-0.20200413103321-8b0a0d3bd31e/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= +github.com/dinal/v3io-go v0.0.5-0.20200413110513-3adb776f958c h1:8KQfRbC21cX0iTvEhN36NBC/m4v3OK56Q0LDo7u5tSI= +github.com/dinal/v3io-go v0.0.5-0.20200413110513-3adb776f958c/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= github.com/dinal/v3io-tsdb v0.0.3-0.20200219152045-0a164c3e9089 h1:5g3dEl/Pczj/nLAoifETKZiixiAtyWpWSsbvX6MdFxQ= github.com/dinal/v3io-tsdb v0.0.3-0.20200219152045-0a164c3e9089/go.mod h1:hdeE2K5HLZsp2NL8VRK9vmMp7cZ2F8x3VoQB2Gr34MI= github.com/dinal/v3io-tsdb v0.0.3-0.20200310095450-9aae98637eb4 h1:ePHOQr5zNsft8Vxuub01YuxpPBGYCmxaGvoiuQEPQM8= @@ -84,8 +87,8 @@ github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.1.0/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -119,6 +122,8 @@ github.com/v3io/v3io-go v0.1.5-0.20200308131040-79c5a91d3daf h1:GdM8m1DKlA1pF4py github.com/v3io/v3io-go v0.1.5-0.20200308131040-79c5a91d3daf/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2 h1:9jIOiEqhu1bUAPl6KFiKvpFhQWkcNGVp/9OC72F+9Os= github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= +github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71 h1:yhMPIAWjePxFnhau/TydO7HhR64+skxvfB6wqDdAOOU= +github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71/go.mod h1:D0W1tjsVgcp4xk3ZI2fjKTKaOpYJLewN1BPN0x2osO4= github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2 h1:NJc63wM25iS+ci5z7LVwjWD4QM0QpTQw/fovKzatss0= github.com/v3io/v3io-go-http v0.0.0-20190221115935-53e2b487c9a2/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog= github.com/v3io/v3io-go-http v0.0.0-20190415143924-cc2fbcde6663/go.mod h1:GXYcR9MxgfbE3BJdkXki5EclvtS8Nxu2RQNLA8hMMog= @@ -160,6 +165,8 @@ github.com/v3io/v3io-tsdb v0.9.23 h1:BE7AcJACWoh0OXPaXgIUiDB+bFkYWcsgL6s2xa2C60g github.com/v3io/v3io-tsdb v0.9.23/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= github.com/v3io/v3io-tsdb v0.9.24 h1:99C89uv0f8qet4RLuI3baNMUElDeaiVaaO74zbi0qHw= github.com/v3io/v3io-tsdb v0.9.24/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= +github.com/v3io/v3io-tsdb v0.10.1 h1:COY5HMHLsuhoUhczcOR9dnzzzzhhO1s4a+A9wg3OkoM= +github.com/v3io/v3io-tsdb v0.10.1/go.mod h1:8xi2vWbHDIVfOsENKPAI+X70F7JZvfD9ucYYJ5YJYeI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.0.0 h1:BwIoZQbBsTo3v2F5lz5Oy3TlTq4wLKTLV260EVTEWco= diff --git a/vendor/github.com/v3io/frames/types.go b/vendor/github.com/v3io/frames/types.go index d764268062a..e176fa9e529 100644 --- a/vendor/github.com/v3io/frames/types.go +++ b/vendor/github.com/v3io/frames/types.go @@ -174,6 +174,8 @@ type WriteRequest struct { Expression string // Condition template, for update conditions generated from combining columns data with expression Condition string + // Columns to partition the data by + PartitionKeys []string // Will we get more message chunks (in a stream), if not we can complete HaveMore bool SaveMode SaveMode diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go index e76e156512a..1b8315efb9e 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/container.go @@ -43,6 +43,11 @@ type Container interface { // // Object // + // CheckPathExists + CheckPathExists(*CheckPathExistsInput, interface{}, chan *Response) (*Request, error) + + // CheckPathExistsSync + CheckPathExistsSync(*CheckPathExistsInput) error // GetObject GetObject(*GetObjectInput, interface{}, chan *Response) (*Request, error) diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go index 1798912797d..da642cead8f 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/container.go @@ -204,6 +204,18 @@ func (c *container) DescribeStreamSync(describeStreamInput *v3io.DescribeStreamI return c.session.context.DescribeStreamSync(describeStreamInput) } +// CheckPathExists +func (c *container) CheckPathExists(checkPathExistsInput *v3io.CheckPathExistsInput, context interface{}, responseChan chan *v3io.Response) (*v3io.Request, error) { + c.populateInputFields(&checkPathExistsInput.DataPlaneInput) + return c.session.context.CheckPathExists(checkPathExistsInput, context, responseChan) +} + +// CheckPathExistsSync +func (c *container) CheckPathExistsSync(checkPathExistsInput *v3io.CheckPathExistsInput) error { + c.populateInputFields(&checkPathExistsInput.DataPlaneInput) + return c.session.context.CheckPathExistsSync(checkPathExistsInput) +} + // DeleteStream func (c *container) DeleteStream(deleteStreamInput *v3io.DeleteStreamInput, context interface{}, responseChan chan *v3io.Response) (*v3io.Request, error) { c.populateInputFields(&deleteStreamInput.DataPlaneInput) diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go index abb65cb4f7f..0372483559a 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/http/context.go @@ -125,6 +125,7 @@ func (c *context) GetContainersSync(getContainersInput *v3io.GetContainersInput) nil, &v3io.GetContainersOutput{}) } + // GetClusterMD func (c *context) GetClusterMD(getClusterMDInput *v3io.GetClusterMDInput, context interface{}, @@ -612,6 +613,25 @@ func (c *context) DescribeStreamSync(describeStreamInput *v3io.DescribeStreamInp return response, nil } +// checkPathExists +func (c *context) CheckPathExists(checkPathExistsInput *v3io.CheckPathExistsInput, + context interface{}, + responseChan chan *v3io.Response) (*v3io.Request, error) { + return c.sendRequestToWorker(checkPathExistsInput, context, responseChan) +} + +// checkPathExistsSync +func (c *context) CheckPathExistsSync(checkPathExistsInput *v3io.CheckPathExistsInput) error { + _, err := c.sendRequest(&checkPathExistsInput.DataPlaneInput, + http.MethodHead, + checkPathExistsInput.Path, + "", + nil, + nil, + true) + return err +} + // DeleteStream func (c *context) DeleteStream(deleteStreamInput *v3io.DeleteStreamInput, context interface{}, @@ -1226,6 +1246,8 @@ func (c *context) workerEntry(workerIndex int) { response, err = c.GetContainerContentsSync(typedInput) case *v3io.GetClusterMDInput: response, err = c.GetClusterMDSync(typedInput) + case *v3io.CheckPathExistsInput: + err = c.CheckPathExistsSync(typedInput) default: c.logger.ErrorWith("Got unexpected request type", "type", reflect.TypeOf(request.Input).String()) } diff --git a/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go b/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go index 2a860aaed91..7edb81c8630 100644 --- a/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go +++ b/vendor/github.com/v3io/v3io-go/pkg/dataplane/types.go @@ -66,7 +66,7 @@ type GetClusterMDInput struct { } type GetClusterMDOutput struct { DataPlaneOutput - NumberOfVNs int + NumberOfVNs int } type GetContainerContentsInput struct { @@ -212,7 +212,7 @@ type PutItemInput struct { type PutItemOutput struct { DataPlaneInput - MtimeSecs int + MtimeSecs int MtimeNSecs int } @@ -240,7 +240,7 @@ type UpdateItemInput struct { type UpdateItemOutput struct { DataPlaneInput - MtimeSecs int + MtimeSecs int MtimeNSecs int } @@ -306,6 +306,11 @@ type CreateStreamInput struct { RetentionPeriodHours int } +type CheckPathExistsInput struct { + DataPlaneInput + Path string +} + type DescribeStreamInput struct { DataPlaneInput Path string diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go index 0f63b0f832d..032c8e0b717 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/appender/ingest.go @@ -285,7 +285,7 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response, if resp.Error != nil && metric.getState() != storeStateGet { req := reqInput.(*v3io.UpdateItemInput) - mc.logger.WarnWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key, + mc.logger.DebugWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key, "in-flight", mc.updatesInFlight, "mqueue", mc.metricQueue.Length(), "numsamples", metric.store.samplesQueueLength(), "path", req.Path, "update expression", req.Expression) } else { @@ -323,9 +323,9 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response, if utils.IsFalseConditionError(resp.Error) { req := reqInput.(*v3io.UpdateItemInput) // This might happen on attempt to add metric value of wrong type, i.e. float <-> string - errMsg := fmt.Sprintf("trying to ingest values of incompatible data type. Metric %q has not been updated.", req.Path) - mc.logger.ErrorWith(errMsg) - setError(mc, metric, errors.Wrap(resp.Error, errMsg)) + errMsg := fmt.Sprintf("failed to ingest values of incompatible data type into metric %s.", req.Path) + mc.logger.DebugWith(errMsg) + setError(mc, metric, errors.New(errMsg)) } else { mc.logger.ErrorWith(fmt.Sprintf("Chunk update failed with status code %d.", e.StatusCode())) setError(mc, metric, errors.Wrap(resp.Error, fmt.Sprintf("Chunk update failed due to status code %d.", e.StatusCode()))) diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go index ff4bb796523..4be0430bd23 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/collector.go @@ -64,45 +64,53 @@ func mainCollector(ctx *selectQueryContext, responseChannel chan *qryResults) { lastTimePerMetric := make(map[uint64]int64, len(ctx.columnsSpecByMetric)) lastValuePerMetric := make(map[uint64]float64, len(ctx.columnsSpecByMetric)) - for res := range responseChannel { - if res.IsRawQuery() { - err := rawCollector(ctx, res) - if err != nil { - ctx.errorChannel <- err - return - } - } else { - err := res.frame.addMetricIfNotExist(res.name, ctx.getResultBucketsSize(), res.IsServerAggregates()) - if err != nil { - ctx.logger.Error("problem adding new metric '%v', lset: %v, err:%v", res.name, res.frame.lset, err) - ctx.errorChannel <- err + for { + select { + case _ = <-ctx.stopChan: + return + case res, ok := <-responseChannel: + if !ok { return } - lsetAttr, _ := res.fields[config.LabelSetAttrName].(string) - lset, _ := utils.LabelsFromString(lsetAttr) - lset = append(lset, utils.Label{Name: config.MetricNameAttrName, Value: res.name}) - currentResultHash := lset.Hash() - - // Aggregating cross series aggregates, only supported over raw data. - if ctx.isCrossSeriesAggregate { - lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], _ = aggregateClientAggregatesCrossSeries(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash]) - } else { - // Aggregating over time aggregates - if res.IsServerAggregates() { - aggregateServerAggregates(ctx, res) - } else if res.IsClientAggregates() { - aggregateClientAggregates(ctx, res) + if res.IsRawQuery() { + err := rawCollector(ctx, res) + if err != nil { + ctx.errorChannel <- err + return } - } - - // It is possible to query an aggregate and down sample raw chunks in the same df. - if res.IsDownsample() { - lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], err = downsampleRawData(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash]) + } else { + err := res.frame.addMetricIfNotExist(res.name, ctx.getResultBucketsSize(), res.IsServerAggregates()) if err != nil { - ctx.logger.Error("problem downsampling '%v', lset: %v, err:%v", res.name, res.frame.lset, err) + ctx.logger.Error("problem adding new metric '%v', lset: %v, err:%v", res.name, res.frame.lset, err) ctx.errorChannel <- err return } + lsetAttr, _ := res.fields[config.LabelSetAttrName].(string) + lset, _ := utils.LabelsFromString(lsetAttr) + lset = append(lset, utils.Label{Name: config.MetricNameAttrName, Value: res.name}) + currentResultHash := lset.Hash() + + // Aggregating cross series aggregates, only supported over raw data. + if ctx.isCrossSeriesAggregate { + lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], _ = aggregateClientAggregatesCrossSeries(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash]) + } else { + // Aggregating over time aggregates + if res.IsServerAggregates() { + aggregateServerAggregates(ctx, res) + } else if res.IsClientAggregates() { + aggregateClientAggregates(ctx, res) + } + } + + // It is possible to query an aggregate and down sample raw chunks in the same df. + if res.IsDownsample() { + lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], err = downsampleRawData(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash]) + if err != nil { + ctx.logger.Error("problem downsampling '%v', lset: %v, err:%v", res.name, res.frame.lset, err) + ctx.errorChannel <- err + return + } + } } } } diff --git a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go index 6c6861c7d83..4f3cd12127e 100644 --- a/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go +++ b/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go @@ -44,6 +44,10 @@ type selectQueryContext struct { requestChannels []chan *qryResults errorChannel chan error wg sync.WaitGroup + createDFLock sync.Mutex + stopChan chan bool + queryWG sync.WaitGroup + finalErrorChan chan error } func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *SelectParams) (*frameIterator, error) { @@ -77,18 +81,21 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params * } } + queryCtx.stopChan = make(chan bool, 1) + queryCtx.finalErrorChan = make(chan error, 1) + queryCtx.errorChannel = make(chan error, queryCtx.workers+len(queries)) + err = queryCtx.startCollectors() if err != nil { return nil, err } for _, query := range queries { - err = queryCtx.processQueryResults(query) - if err != nil { - return nil, err - } + queryCtx.queryWG.Add(1) + go processQueryResults(queryCtx, query) } + queryCtx.queryWG.Wait() for i := 0; i < queryCtx.workers; i++ { close(queryCtx.requestChannels[i]) } @@ -98,7 +105,7 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params * close(queryCtx.errorChannel) // return first error - err = <-queryCtx.errorChannel + err = <-queryCtx.finalErrorChan if err != nil { return nil, err } @@ -260,7 +267,6 @@ func (queryCtx *selectQueryContext) parsePreAggregateLabels(partition *partmgr.D func (queryCtx *selectQueryContext) startCollectors() error { queryCtx.requestChannels = make([]chan *qryResults, queryCtx.workers) - queryCtx.errorChannel = make(chan error, queryCtx.workers) // Increment the WaitGroup counter. queryCtx.wg.Add(queryCtx.workers) @@ -274,27 +280,45 @@ func (queryCtx *selectQueryContext) startCollectors() error { }(i) } + // Watch error channel, and signal all go routines to stop in case of an error + go func() { + // Signal all goroutines to stop when error received + err, ok := <-queryCtx.errorChannel + if ok && err != nil { + close(queryCtx.stopChan) + queryCtx.finalErrorChan <- err + } + + close(queryCtx.finalErrorChan) + return + }() + return nil } -func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error { +func processQueryResults(queryCtx *selectQueryContext, query *partQuery) { + defer queryCtx.queryWG.Done() + for query.Next() { // read metric name name, ok := query.GetField(config.MetricNameAttrName).(string) if !ok { - return fmt.Errorf("could not find metric name attribute in response, res:%v", query.GetFields()) + queryCtx.errorChannel <- fmt.Errorf("could not find metric name attribute in response, res:%v", query.GetFields()) + return } // read label set lsetAttr, lok := query.GetField(config.LabelSetAttrName).(string) if !lok { - return fmt.Errorf("could not find label set attribute in response, res:%v", query.GetFields()) + queryCtx.errorChannel <- fmt.Errorf("could not find label set attribute in response, res:%v", query.GetFields()) + return } lset, err := utils.LabelsFromString(lsetAttr) if err != nil { - return err + queryCtx.errorChannel <- err + return } // read chunk encoding type @@ -306,7 +330,8 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error } else { intEncoding, err := strconv.Atoi(encodingStr) if err != nil { - return fmt.Errorf("error parsing encoding type of chunk, got: %v, error: %v", encodingStr, err) + queryCtx.errorChannel <- fmt.Errorf("error parsing encoding type of chunk, got: %v, error: %v", encodingStr, err) + return } encoding = chunkenc.Encoding(intEncoding) } @@ -324,7 +349,8 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error if labelValue != "" { newLset[i] = utils.Label{Name: trimmed, Value: labelValue} } else { - return fmt.Errorf("no label named %v found to group by", trimmed) + queryCtx.errorChannel <- fmt.Errorf("no label named %v found to group by", trimmed) + return } } lset = newLset @@ -336,6 +362,7 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error hash = lset.Hash() } + queryCtx.createDFLock.Lock() // find or create data frame frame, ok := queryCtx.dataFrames[hash] if !ok { @@ -349,19 +376,30 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error results.IsServerAggregates(), queryCtx.showAggregateLabel) if err != nil { - return err + queryCtx.errorChannel <- err + queryCtx.createDFLock.Unlock() + return } queryCtx.dataFrames[hash] = frame queryCtx.frameList = append(queryCtx.frameList, frame) } + queryCtx.createDFLock.Unlock() results.frame = frame - workerNum := hash & uint64(queryCtx.workers-1) - queryCtx.requestChannels[workerNum] <- &results + + // In case termination signal was received exit, Otherwise send query result to worker + select { + case _ = <-queryCtx.stopChan: + return + case queryCtx.requestChannels[workerNum] <- &results: + } + } - return query.Err() + if query.Err() != nil { + queryCtx.errorChannel <- query.Err() + } } func (queryCtx *selectQueryContext) createColumnSpecs() ([]columnMeta, map[string][]columnMeta, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index cbf53c5e821..d9da2c8b868 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -298,15 +298,15 @@ github.com/shurcooL/vfsgen github.com/stretchr/testify/require github.com/stretchr/testify/suite github.com/stretchr/testify/assert -# github.com/v3io/frames v0.6.14-v0.9.18 => github.com/v3io/frames v0.6.15-v0.9.24 +# github.com/v3io/frames v0.6.14-v0.9.18 => github.com/v3io/frames v0.7.2 github.com/v3io/frames github.com/v3io/frames/pb -# github.com/v3io/v3io-go v0.1.5-0.20200316155009-cc83de374ff2 +# github.com/v3io/v3io-go v0.1.5-0.20200413162202-5d20cf2c5c71 github.com/v3io/v3io-go/pkg/dataplane github.com/v3io/v3io-go/pkg/errors github.com/v3io/v3io-go/pkg/dataplane/http github.com/v3io/v3io-go/pkg/dataplane/schemas/node/common -# github.com/v3io/v3io-tsdb v0.9.24 => github.com/v3io/v3io-tsdb v0.9.24 +# github.com/v3io/v3io-tsdb v0.10.1 => github.com/v3io/v3io-tsdb v0.10.1 github.com/v3io/v3io-tsdb/pkg/aggregate github.com/v3io/v3io-tsdb/pkg/appender github.com/v3io/v3io-tsdb/pkg/config From 5bc4072b06091666b37d5104cb5782a1c5f5c173 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 27 Apr 2020 17:12:33 +0300 Subject: [PATCH 2/2] Use semver only. Drop custom pattern. --- Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 764e3224039..b2d96037d8d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -21,8 +21,8 @@ podTemplate(label: "${git_project}-${label}", inheritFrom: "jnlp-docker") { common.notify_slack { stage('get tag data') { container('jnlp') { - TAG_VERSION = github.get_tag_version(TAG_NAME, '^(v[\\.0-9]*.*-v[\\.0-9]*|unstable)\$') - DOCKER_TAG_VERSION = github.get_docker_tag_version(TAG_NAME, '^(v[\\.0-9]*.*-v[\\.0-9]*|unstable)\$') + TAG_VERSION = github.get_tag_version(TAG_NAME) + DOCKER_TAG_VERSION = github.get_docker_tag_version(TAG_NAME) echo "$TAG_VERSION" echo "$DOCKER_TAG_VERSION"