Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/Policer enhancements #2600

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ Changelog for NeoFS Node

## [Unreleased]

### Added
- Policer's setting to the SN's application configuration (#2600)

### Fixed
- `neofs-cli netmap netinfo` documentation (#2555)
- `GETRANGEHASH` to a node without an object produced `GETRANGE` or `GET` requests (#2541, #2598)
Expand All @@ -13,9 +16,11 @@ Changelog for NeoFS Node
### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)
- BoltDB open timeout increased from 100ms to 1s (#2499)
- Internal container cache size from 10 to 1000 (#2600)

### Removed
- deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496)
- Recently-handled objects Policer's cache (#2600)

### Updated
- Update minimal supported Go version up to v1.19 (#2485)
Expand Down
33 changes: 33 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/storage"
"github.com/nspcc-dev/neofs-node/misc"
Expand All @@ -56,6 +57,7 @@ import (
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
Expand Down Expand Up @@ -100,6 +102,13 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []storage.ShardCfg
}

PolicerCfg struct {
maxCapacity uint32
headTimeout time.Duration
replicationCooldown time.Duration
objectBatchSize uint32
}
}

// readConfig fills applicationConfiguration with raw configuration values
Expand All @@ -126,6 +135,13 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {

a.LoggerCfg.level = loggerconfig.Level(c)

// Policer

a.PolicerCfg.maxCapacity = policerconfig.MaxWorkers(c)
a.PolicerCfg.headTimeout = policerconfig.HeadTimeout(c)
a.PolicerCfg.replicationCooldown = policerconfig.ReplicationCooldown(c)
a.PolicerCfg.objectBatchSize = policerconfig.ObjectBatchSize(c)

// Storage Engine

a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
Expand Down Expand Up @@ -304,6 +320,8 @@ type shared struct {

respSvc *response.Service

policer *policer.Policer

replicator *replicator.Replicator

treeService *tree.Service
Expand Down Expand Up @@ -727,6 +745,17 @@ func (c *cfg) shardOpts() []shardOptsWithID {
return shards
}

func (c *cfg) policerOpts() []policer.Option {
pCfg := c.applicationConfiguration.PolicerCfg

return []policer.Option{
policer.WithMaxCapacity(pCfg.maxCapacity),
policer.WithHeadTimeout(pCfg.headTimeout),
policer.WithReplicationCooldown(pCfg.replicationCooldown),
policer.WithObjectBatchSize(pCfg.objectBatchSize),
}
}

func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
Expand Down Expand Up @@ -905,6 +934,10 @@ func (c *cfg) configWatcher(ctx context.Context) {
continue
}

// Policer

c.shared.policer.Reload(c.policerOpts()...)

// Storage Engine

var rcfg engine.ReConfiguration
Expand Down
47 changes: 47 additions & 0 deletions cmd/neofs-node/config/policer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ const (

// HeadTimeoutDefault is a default object.Head request timeout in policer.
HeadTimeoutDefault = 5 * time.Second

// ReplicationCooldownDefault is a default cooldown time b/w replication tasks
// submitting.
ReplicationCooldownDefault = time.Duration(0)
// ObjectBatchSizeDefault is a default replication's objects batch size.
ObjectBatchSizeDefault = 10
// MaxWorkersDefault is a default replication's worker pool's maximum size.
MaxWorkersDefault = 20
)

// HeadTimeout returns the value of "head_timeout" config parameter
Expand All @@ -25,3 +33,42 @@ func HeadTimeout(c *config.Config) time.Duration {

return HeadTimeoutDefault
}

// ReplicationCooldown returns the value of "replication_cooldown" config parameter
// from "policer" section.
//
// Returns ReplicationCooldownDefault if a value is not a positive duration.
func ReplicationCooldown(c *config.Config) time.Duration {
v := config.DurationSafe(c.Sub(subsection), "replication_cooldown")
if v > 0 {
return v
}

return ReplicationCooldownDefault
}

// ObjectBatchSize returns the value of "object_batch_size" config parameter
// from "policer" section.
//
// Returns ObjectBatchSizeDefault if a value is not a positive number.
func ObjectBatchSize(c *config.Config) uint32 {
v := config.Uint32Safe(c.Sub(subsection), "object_batch_size")
if v > 0 {
return v
}

return ObjectBatchSizeDefault
}

// MaxWorkers returns the value of "max_workers" config parameter
// from "policer" section.
//
// Returns MaxWorkersDefault if a value is not a positive number.
func MaxWorkers(c *config.Config) uint32 {
v := config.Uint32Safe(c.Sub(subsection), "max_workers")
if v > 0 {
return v
}

return MaxWorkersDefault
}
6 changes: 6 additions & 0 deletions cmd/neofs-node/config/policer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ func TestPolicerSection(t *testing.T) {
empty := configtest.EmptyConfig()

require.Equal(t, policerconfig.HeadTimeoutDefault, policerconfig.HeadTimeout(empty))
require.Equal(t, policerconfig.ReplicationCooldownDefault, policerconfig.ReplicationCooldown(empty))
require.Equal(t, uint32(policerconfig.ObjectBatchSizeDefault), policerconfig.ObjectBatchSize(empty))
require.Equal(t, uint32(policerconfig.MaxWorkersDefault), policerconfig.MaxWorkers(empty))
})

const path = "../../../../config/example/node"

var fileConfigTest = func(c *config.Config) {
require.Equal(t, 15*time.Second, policerconfig.HeadTimeout(c))
require.Equal(t, 101*time.Millisecond, policerconfig.ReplicationCooldown(c))
require.Equal(t, uint32(11), policerconfig.ObjectBatchSize(c))
require.Equal(t, uint32(21), policerconfig.MaxWorkers(c))
}

configtest.ForEachFileType(path, fileConfigTest)
Expand Down
13 changes: 6 additions & 7 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
Expand Down Expand Up @@ -190,7 +189,7 @@ func initObjectService(c *cfg) {
),
)

pol := policer.New(
c.policer = policer.New(
policer.WithLogger(c.log),
policer.WithLocalStorage(ls),
policer.WithContainerSource(c.cfgObject.cnrSource),
Expand All @@ -201,9 +200,7 @@ func initObjectService(c *cfg) {
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
),
policer.WithNetmapKeys(c),
policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg),
),
policer.WithHeadTimeout(c.applicationConfiguration.PolicerCfg.headTimeout),
policer.WithReplicator(c.replicator),
policer.WithRedundantCopyCallback(func(addr oid.Address) {
var inhumePrm engine.InhumePrm
Expand All @@ -216,15 +213,17 @@ func initObjectService(c *cfg) {
)
}
}),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithMaxCapacity(c.applicationConfiguration.PolicerCfg.maxCapacity),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithNodeLoader(c),
policer.WithNetwork(c),
policer.WithReplicationCooldown(c.applicationConfiguration.PolicerCfg.replicationCooldown),
policer.WithObjectBatchSize(c.applicationConfiguration.PolicerCfg.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, pol)
c.workers = append(c.workers, c.policer)

var os putsvc.ObjectStorage = engineWithoutNotifications{
engine: ls,
Expand Down
5 changes: 5 additions & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ NEOFS_APICLIENT_ALLOW_EXTERNAL=true

# Policer section
NEOFS_POLICER_HEAD_TIMEOUT=15s
NEOFS_POLICER_CACHE_SIZE=1000001
NEOFS_POLICER_CACHE_TIME=31s
NEOFS_POLICER_REPLICATION_COOLDOWN=101ms
NEOFS_POLICER_OBJECT_BATCH_SIZE=11
NEOFS_POLICER_MAX_WORKERS=21

# Replicator section
NEOFS_REPLICATOR_PUT_TIMEOUT=15s
Expand Down
7 changes: 6 additions & 1 deletion config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@
"allow_external": true
},
"policer": {
"head_timeout": "15s"
"head_timeout": "15s",
"cache_size": "1000001",
"cache_time": "31s",
"replication_cooldown": "101ms",
"object_batch_size": "11",
"max_workers": "21"
},
"replicator": {
"pool_size": 10,
Expand Down
5 changes: 5 additions & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ apiclient:

policer:
head_timeout: 15s # timeout for the Policer HEAD remote operation
cache_size: 1000001 # recently-handled objects cache size
cache_time: 31s # recently-handled objects cache expiration time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size and time to cache section?

but 1st of all: why do we expose all parameters while #2590 suggests replication_cooldown only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size and time to cache section?

well, could be done but i am almost sure the resulting config will change so lets wait testing results and see what we need

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we expose all parameters while #2590 suggests replication_cooldown only?

the whole PR is a suggestion. we have magic consts and I am not sure how they work. so now we have a PR that can be tested

replication_cooldown: 101ms # cooldown time b/w replication tasks submitting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's implemented like interval, not cooldown, so why is it called so?

Copy link
Member Author

@carpawell carpawell Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's implemented like interval, not cooldown

what do you mean here? the current implementation is what i would call a cooldown (but i could be wrong)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if replication_cooldown is 10s and last replication took 5s, then next replication will start after 5s or 10s? If 10s, then yes, this is cooldown

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use some ascii paintings here

Rs -- replication started
Rf -- replication finished

Rs-----Rf-----Rs--------------------Rf/Rs
|______||_____||______________________|
   5s      5s            20s

a cooldown to me is "i used some func, next time i will be able to use it no earlier than after *cooldown_time*"
an interval to me is "i used some func, lets sleep for *interval_time*"

object_batch_size: 11 # replication's objects batch size
max_workers: 21 # replication's worker pool's maximum size

replicator:
put_timeout: 15s # timeout for the Replicator PUT remote operation (defaults to 1m)
Expand Down
13 changes: 13 additions & 0 deletions docs/sighup.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

Logger level can be reloaded with a SIGHUP.

## Policer

Available for reconfiguration fields:

```yml
head_timeout:
cache_size:
cache_time:
replication_cooldown:
object_batch_size:
max_workers:
```

## Storage engine

Shards can be added, removed or reloaded with SIGHUP.
Expand Down
12 changes: 9 additions & 3 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,17 @@ Configuration for the Policer service. It ensures that object is stored accordin
```yaml
policer:
head_timeout: 15s
replication_cooldown: 100ms
object_batch_size: 10
max_workers: 20
```

| Parameter | Type | Default value | Description |
|----------------|------------|---------------|----------------------------------------------|
| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. |
| Parameter | Type | Default value | Description |
|------------------------|------------|---------------|-----------------------------------------------------|
| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. |
| `replication_cooldown` | `duration` | `0s` | Cooldown time between replication tasks submitting. |
| `object_batch_size` | `int` | `10` | Replication's objects batch size. |
| `max_workers` | `int` | `20` | Replication's worker pool's maximum size. |

# `replicator` section

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object_manager/placement/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type netMapSrc struct {
}

// defaultContainerCacheSize is the default size for the container cache.
const defaultContainerCacheSize = 10
const defaultContainerCacheSize = 1000

func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder {
cache, _ := simplelru.NewLRU[string, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
Expand Down
6 changes: 5 additions & 1 deletion pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ type processPlacementContext struct {
func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address)

p.cfg.RLock()
headTimeout := p.headTimeout
p.cfg.RUnlock()

// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int

Expand Down Expand Up @@ -262,7 +266,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
continue
}

callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
callCtx, cancel := context.WithTimeout(ctx, headTimeout)

_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))

Expand Down
Loading