From 44c58042a200150aada20d0735d08eda92228ba1 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 18 Jul 2024 23:14:16 +0300 Subject: [PATCH] internal/native: add `Search` operation to extension Simple operation, supports generic filters and relative metrics (seconds per single object in results). Signed-off-by: Pavel Karpy --- internal/native/client.go | 52 +++++++++++++++++++++++++++++++++++++++ internal/native/native.go | 3 +++ 2 files changed, 55 insertions(+) diff --git a/internal/native/client.go b/internal/native/client.go index 6ab41fb..3b7f2a4 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -323,6 +323,58 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse { return PutContainerResponse{Success: true, ContainerID: contID.EncodeToString()} } +type filter struct { + Key string + Operation string + Value string +} + +func (c *Client) Search(cnrString string, filtersJS []filter) (int, error) { + var cID cid.ID + err := cID.DecodeString(cnrString) + if err != nil { + return 0, fmt.Errorf("reading container ID: %w", err) + } + + var op object.SearchMatchType + var filters object.SearchFilters + for _, flt := range filtersJS { + if !op.DecodeString(flt.Operation) { + return 0, fmt.Errorf("unknown filter operation: %s", flt.Operation) + } + + filters.AddFilter(flt.Key, flt.Value, op) + } + + var prm client.PrmObjectSearch + prm.SetFilters(filters) + + start := time.Now() + + r, err := c.cli.ObjectSearchInit(c.vu.Context(), cID, c.signer, prm) + if err != nil { + return 0, fmt.Errorf("search stream initialization: %w", err) + } + + var objsNum int + err = r.Iterate(func(oID oid.ID) bool { + objsNum++ + return false + }) + if err != nil { + return 0, fmt.Errorf("reading search results: %w", err) + } + + var relativeTime time.Duration + if objsNum > 0 { + relativeTime = time.Since(start) / time.Duration(objsNum) + } + + stats.Report(c.vu, objSearchDurationRelative, metrics.D(relativeTime)) + + return objsNum, nil +} + func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject { maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) if err != nil { diff --git a/internal/native/native.go b/internal/native/native.go index 60c4cc6..3f59b47 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -33,6 +33,7 @@ var ( objGetTotal, objGetFails, objGetDuration *metrics.Metric objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric cnrPutTotal, cnrPutFails, cnrPutDuration *metrics.Metric + objSearchDurationRelative *metrics.Metric ) func init() { @@ -136,6 +137,8 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime cnrPutFails, _ = registry.NewMetric("neofs_cnr_put_fails", metrics.Counter) cnrPutDuration, _ = registry.NewMetric("neofs_cnr_put_duration", metrics.Trend, metrics.Time) + objSearchDurationRelative, _ = registry.NewMetric("neofs_search_duration_relative", metrics.Trend, metrics.Time) + return &Client{ vu: n.vu, signer: signer,