diff --git a/internal/native/client.go b/internal/native/client.go index 6ab41fb..ae476cd 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -323,6 +323,63 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse { return PutContainerResponse{Success: true, ContainerID: contID.EncodeToString()} } +type Filter struct { + Key string + Operation string + Value string +} + +// Search searches for the objects in container that satisfies provided filters. +// Returns number of found objects. +func (c *Client) Search(cnrString string, filtersJS []Filter) (int, error) { + var cID cid.ID + err := cID.DecodeString(cnrString) + if err != nil { + return 0, fmt.Errorf("reading container ID: %w", err) + } + + var op object.SearchMatchType + var filters object.SearchFilters + for _, flt := range filtersJS { + if !op.DecodeString(flt.Operation) { + return 0, fmt.Errorf("unknown filter operation: %s", flt.Operation) + } + + filters.AddFilter(flt.Key, flt.Value, op) + } + + var prm client.PrmObjectSearch + prm.SetFilters(filters) + + start := time.Now() + + r, err := c.cli.ObjectSearchInit(c.vu.Context(), cID, c.signer, prm) + if err != nil { + return 0, fmt.Errorf("search stream initialization: %w", err) + } + defer func() { + _ = r.Close() + }() + + var objsNum int + err = r.Iterate(func(_ oid.ID) bool { + objsNum++ + return false + }) + if err != nil { + return 0, fmt.Errorf("reading search results: %w", err) + } + + var relativeTime time.Duration + if objsNum > 0 { + relativeTime = time.Since(start) / time.Duration(objsNum) + } + + stats.Report(c.vu, objSearchDurationRelative, metrics.D(relativeTime)) + + return objsNum, nil +} + func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject { maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) if err != nil { diff --git a/internal/native/native.go b/internal/native/native.go index 60c4cc6..3f59b47 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -33,6 +33,7 @@ var ( objGetTotal, objGetFails, objGetDuration *metrics.Metric objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric cnrPutTotal, cnrPutFails, cnrPutDuration *metrics.Metric + objSearchDurationRelative *metrics.Metric ) func init() { @@ -136,6 +137,8 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime cnrPutFails, _ = registry.NewMetric("neofs_cnr_put_fails", metrics.Counter) cnrPutDuration, _ = registry.NewMetric("neofs_cnr_put_duration", metrics.Trend, metrics.Time) + objSearchDurationRelative, _ = registry.NewMetric("neofs_search_duration_relative", metrics.Trend, metrics.Time) + return &Client{ vu: n.vu, signer: signer, diff --git a/scenarios/grpc_search.js b/scenarios/grpc_search.js new file mode 100644 index 0000000..e4f47ef --- /dev/null +++ b/scenarios/grpc_search.js @@ -0,0 +1,35 @@ +import { check } from 'k6'; +import native from 'k6/x/neofs/native'; + +const dial_timeout = 5 +const stream_timeout = 15 +const predefined_private_key = '' // usually no need for search requests in load tests +const grpc_client = native.connect(__ENV.GRPC_ENDPOINT, predefined_private_key, dial_timeout, stream_timeout); +const container = __ENV.cid + +export const options = { + scenarios: { + system_write: { + executor: 'shared-iterations', + vus: __ENV.vu, + iterations: __ENV.i, + exec: 'search', + maxDuration: (24*365*100).toString()+"h", // default is 10m and this load is designed to be controlled by iterations only + gracefulStop: '30s', + }, + }, +}; + +export function search() { + let res = grpc_client.search(container, [{ + key: "test", + operation: "STRING_EQUAL", + value: "test" + }]) + check(res, { + 'search': (r) => { + return r > 0; + } + } + ) +}