Skip to content

Commit

Permalink
Merge pull request #91 from nspcc-dev/feat/search-load
Browse files Browse the repository at this point in the history
Feat/search load
  • Loading branch information
roman-khimov authored Aug 12, 2024
2 parents 0d3cabc + abecf3c commit bcbeb1c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
57 changes: 57 additions & 0 deletions internal/native/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,63 @@ func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
return PutContainerResponse{Success: true, ContainerID: contID.EncodeToString()}
}

type Filter struct {
Key string
Operation string
Value string
}

// Search searches for the objects in container that satisfies provided filters.
// Returns number of found objects.
func (c *Client) Search(cnrString string, filtersJS []Filter) (int, error) {
var cID cid.ID
err := cID.DecodeString(cnrString)
if err != nil {
return 0, fmt.Errorf("reading container ID: %w", err)
}

var op object.SearchMatchType
var filters object.SearchFilters
for _, flt := range filtersJS {
if !op.DecodeString(flt.Operation) {
return 0, fmt.Errorf("unknown filter operation: %s", flt.Operation)
}

filters.AddFilter(flt.Key, flt.Value, op)
}

var prm client.PrmObjectSearch
prm.SetFilters(filters)

start := time.Now()

r, err := c.cli.ObjectSearchInit(c.vu.Context(), cID, c.signer, prm)
if err != nil {
return 0, fmt.Errorf("search stream initialization: %w", err)
}
defer func() {
_ = r.Close()
}()

var objsNum int
err = r.Iterate(func(_ oid.ID) bool {
objsNum++
return false
})
if err != nil {
return 0, fmt.Errorf("reading search results: %w", err)
}

var relativeTime time.Duration
if objsNum > 0 {
relativeTime = time.Since(start) / time.Duration(objsNum)
}

stats.Report(c.vu, objSearchDurationRelative, metrics.D(relativeTime))

return objsNum, nil
}

func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject {
maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
objGetTotal, objGetFails, objGetDuration *metrics.Metric
objDeleteTotal, objDeleteFails, objDeleteDuration *metrics.Metric
cnrPutTotal, cnrPutFails, cnrPutDuration *metrics.Metric
objSearchDurationRelative *metrics.Metric
)

func init() {
Expand Down Expand Up @@ -136,6 +137,8 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
cnrPutFails, _ = registry.NewMetric("neofs_cnr_put_fails", metrics.Counter)
cnrPutDuration, _ = registry.NewMetric("neofs_cnr_put_duration", metrics.Trend, metrics.Time)

objSearchDurationRelative, _ = registry.NewMetric("neofs_search_duration_relative", metrics.Trend, metrics.Time)

return &Client{
vu: n.vu,
signer: signer,
Expand Down
35 changes: 35 additions & 0 deletions scenarios/grpc_search.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { check } from 'k6';
import native from 'k6/x/neofs/native';

const dial_timeout = 5
const stream_timeout = 15
const predefined_private_key = '' // usually no need for search requests in load tests
const grpc_client = native.connect(__ENV.GRPC_ENDPOINT, predefined_private_key, dial_timeout, stream_timeout);
const container = __ENV.cid

export const options = {
scenarios: {
system_write: {
executor: 'shared-iterations',
vus: __ENV.vu,
iterations: __ENV.i,
exec: 'search',
maxDuration: (24*365*100).toString()+"h", // default is 10m and this load is designed to be controlled by iterations only
gracefulStop: '30s',
},
},
};

export function search() {
let res = grpc_client.search(container, [{
key: "test",
operation: "STRING_EQUAL",
value: "test"
}])
check(res, {
'search': (r) => {
return r > 0;
}
}
)
}

0 comments on commit bcbeb1c

Please sign in to comment.