Skip to content

Commit

Permalink
Merge pull request #5115 from onflow/yahya/peer-protection-with-flag
Browse files Browse the repository at this point in the history
[Networking] Stream protection and connection manager configuration
  • Loading branch information
yhassanzadeh13 authored Dec 15, 2023
2 parents 02bb636 + 831ce2a commit 48214e5
Show file tree
Hide file tree
Showing 41 changed files with 627 additions and 481 deletions.
30 changes: 23 additions & 7 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,15 +967,31 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.BoolVar(&builder.rpcMetricsEnabled, "rpc-metrics-enabled", defaultConfig.rpcMetricsEnabled, "whether to enable the rpc metrics")
flags.UintVar(&builder.TxResultCacheSize, "transaction-result-cache-size", defaultConfig.TxResultCacheSize, "transaction result cache size.(Disabled by default i.e 0)")
flags.UintVar(&builder.TxErrorMessagesCacheSize, "transaction-error-messages-cache-size", defaultConfig.TxErrorMessagesCacheSize, "transaction error messages cache size.(By default 1000)")
flags.StringVarP(&builder.nodeInfoFile, "node-info-file", "", defaultConfig.nodeInfoFile, "full path to a json file which provides more details about nodes when reporting its reachability metrics")
flags.StringVarP(&builder.nodeInfoFile,
"node-info-file",
"",
defaultConfig.nodeInfoFile,
"full path to a json file which provides more details about nodes when reporting its reachability metrics")
flags.StringToIntVar(&builder.apiRatelimits, "api-rate-limits", defaultConfig.apiRatelimits, "per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.BoolVar(&builder.supportsObserver, "supports-observer", defaultConfig.supportsObserver, "true if this staked access node supports observer or follower connections")
flags.StringVar(&builder.PublicNetworkConfig.BindAddress, "public-network-address", defaultConfig.PublicNetworkConfig.BindAddress, "staked access node's public network bind address")
flags.BoolVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "circuit-breaker-enabled", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled, "specifies whether the circuit breaker is enabled for collection and execution API clients.")
flags.DurationVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "circuit-breaker-restore-timeout", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout, "duration after which the circuit breaker will restore the connection to the client after closing it due to failures. Default value is 60s")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "circuit-breaker-max-failures", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures, "maximum number of failed calls to the client that will cause the circuit breaker to close the connection. Default value is 5")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "circuit-breaker-max-requests", defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests, "maximum number of requests to check if connection restored after timeout. Default value is 1")
flags.BoolVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled,
"circuit-breaker-enabled",
defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.Enabled,
"specifies whether the circuit breaker is enabled for collection and execution API clients.")
flags.DurationVar(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout,
"circuit-breaker-restore-timeout",
defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.RestoreTimeout,
"duration after which the circuit breaker will restore the connection to the client after closing it due to failures. Default value is 60s")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures,
"circuit-breaker-max-failures",
defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxFailures,
"maximum number of failed calls to the client that will cause the circuit breaker to close the connection. Default value is 5")
flags.Uint32Var(&builder.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests,
"circuit-breaker-max-requests",
defaultConfig.rpcConf.BackendConfig.CircuitBreakerConfig.MaxRequests,
"maximum number of requests to check if connection restored after timeout. Default value is 1")
// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled,
"execution-data-sync-enabled",
Expand Down Expand Up @@ -1679,7 +1695,7 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
// - Any error encountered during initialization. Any error should be considered fatal.
func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.PrivateKey, bindAddress string, networkMetrics module.LibP2PMetrics) (p2p.LibP2PNode,
error) {
connManager, err := connection.NewConnManager(builder.Logger, networkMetrics, &builder.FlowConfig.NetworkConfig.ConnectionManagerConfig)
connManager, err := connection.NewConnManager(builder.Logger, networkMetrics, &builder.FlowConfig.NetworkConfig.ConnectionManager)
if err != nil {
return nil, fmt.Errorf("could not create connection manager: %w", err)
}
Expand All @@ -1706,7 +1722,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
},
&p2pconfig.UnicastConfig{
UnicastConfig: builder.FlowConfig.NetworkConfig.UnicastConfig,
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetBasicResolver(builder.Resolver).
SetSubscriptionFilter(subscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
Expand Down
2 changes: 1 addition & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
},
&p2pconfig.UnicastConfig{
UnicastConfig: builder.FlowConfig.NetworkConfig.UnicastConfig,
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetSubscriptionFilter(
subscription.NewRoleBasedFilter(
Expand Down
28 changes: 14 additions & 14 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,21 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {

// setup default rate limiter options
unicastRateLimiterOpts := []ratelimit.RateLimitersOption{
ratelimit.WithDisabledRateLimiting(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun),
ratelimit.WithDisabledRateLimiting(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun),
ratelimit.WithNotifier(fnb.UnicastRateLimiterDistributor),
}

// override noop unicast message rate limiter
if fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit > 0 {
if fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit > 0 {
unicastMessageRateLimiter := ratelimiter.NewRateLimiter(
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration,
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.MessageRateLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.LockoutDuration,
)
unicastRateLimiterOpts = append(unicastRateLimiterOpts, ratelimit.WithMessageRateLimiter(unicastMessageRateLimiter))

// avoid connection gating and pruning during dry run
if !fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun {
if !fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun {
f := rateLimiterPeerFilter(unicastMessageRateLimiter)
// add IsRateLimited peerFilters to conn gater intercept secure peer and peer manager filters list
// don't allow rate limited peers to establishing incoming connections
Expand All @@ -334,16 +334,16 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
}

// override noop unicast bandwidth rate limiter
if fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit > 0 && fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthBurstLimit > 0 {
if fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthRateLimit > 0 && fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthBurstLimit > 0 {
unicastBandwidthRateLimiter := ratelimit.NewBandWidthRateLimiter(
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthBurstLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.LockoutDuration,
rate.Limit(fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthRateLimit),
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.BandwidthBurstLimit,
fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.LockoutDuration,
)
unicastRateLimiterOpts = append(unicastRateLimiterOpts, ratelimit.WithBandwidthRateLimiter(unicastBandwidthRateLimiter))

// avoid connection gating and pruning during dry run
if !fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.DryRun {
if !fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast.RateLimiter.DryRun {
f := rateLimiterPeerFilter(unicastBandwidthRateLimiter)
// add IsRateLimited peerFilters to conn gater intercept secure peer and peer manager filters list
connGaterInterceptSecureFilters = append(connGaterInterceptSecureFilters, f)
Expand All @@ -355,7 +355,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
unicastRateLimiters := ratelimit.NewRateLimiters(unicastRateLimiterOpts...)

uniCfg := &p2pconfig.UnicastConfig{
UnicastConfig: fnb.BaseConfig.FlowConfig.NetworkConfig.UnicastConfig,
Unicast: fnb.BaseConfig.FlowConfig.NetworkConfig.Unicast,
RateLimiterDistributor: fnb.UnicastRateLimiterDistributor,
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
&fnb.FlowConfig.NetworkConfig.GossipSub,
&fnb.FlowConfig.NetworkConfig.ResourceManager,
uniCfg,
&fnb.FlowConfig.NetworkConfig.ConnectionManagerConfig,
&fnb.FlowConfig.NetworkConfig.ConnectionManager,
&p2p.DisallowListCacheConfig{
MaxSize: fnb.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(fnb.HeroCacheMetricsFactory(), network.PrivateNetwork),
Expand Down Expand Up @@ -492,7 +492,7 @@ func (fnb *FlowNodeBuilder) InitFlowNetworkWithConduitFactory(
IdentityProvider: fnb.IdentityProvider,
ReceiveCache: receiveCache,
ConduitFactory: cf,
UnicastMessageTimeout: fnb.FlowConfig.NetworkConfig.UnicastMessageTimeout,
UnicastMessageTimeout: fnb.FlowConfig.NetworkConfig.Unicast.MessageTimeout,
IdentityTranslator: fnb.IDTranslator,
AlspCfg: &alspmgr.MisbehaviorReportManagerConfig{
Logger: fnb.Logger,
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func TestDefaultConfig(t *testing.T) {
func TestFlowConfig_Validate(t *testing.T) {
c := defaultConfig(t)
// set invalid config values
c.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.MessageRateLimit = -100
c.NetworkConfig.UnicastConfig.UnicastRateLimitersConfig.BandwidthRateLimit = -100
c.NetworkConfig.Unicast.RateLimiter.MessageRateLimit = -100
c.NetworkConfig.Unicast.RateLimiter.BandwidthRateLimit = -100
err := c.Validate()
require.Error(t, err)
errs, ok := errors.Unwrap(err).(validator.ValidationErrors)
Expand Down
Loading

0 comments on commit 48214e5

Please sign in to comment.