Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use AlwaysRouteFilteringStrategy if alwaysRoute is enabled #141

Merged
merged 23 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/checks/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ var defaultUpstreamConfig = &config.UpstreamConfig{
}

var defaultRoutingConfig = &config.RoutingConfig{
PassiveLatencyChecking: true,
DetectionWindow: config.NewDuration(10 * time.Minute),
BanWindow: config.NewDuration(50 * time.Minute),
DetectionWindow: config.NewDuration(10 * time.Minute),
BanWindow: config.NewDuration(50 * time.Minute),
Errors: &config.ErrorsConfig{
Rate: 0.25,
HTTPCodes: []string{
Expand Down
260 changes: 98 additions & 162 deletions internal/checks/errorlatency.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package checks

import (
"context"
"math"
"net/http"
"strconv"
Expand Down Expand Up @@ -129,125 +128,92 @@ func NewCircuitBreaker(
Build()
}

// ErrorLatencyCheck
// Error checking is disabled if `errorCircuitBreaker` is nil.
// Latency checking is disabled if `methodLatencyBreaker` is nil.
// At least one of these two must be non-nil.
type ErrorLatencyCheck struct {
client client.EthClient
Err error
clientGetter client.EthClientGetter
metricsContainer *metrics.Container
logger *zap.Logger
upstreamConfig *conf.UpstreamConfig
routingConfig *conf.RoutingConfig
errorCircuitBreaker ErrorCircuitBreaker
methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker
lock sync.RWMutex
ShouldRunPassiveHealthChecks bool
client client.EthClient
Err error
clientGetter client.EthClientGetter
metricsContainer *metrics.Container
logger *zap.Logger
upstreamConfig *conf.UpstreamConfig
routingConfig *conf.RoutingConfig
errorCircuitBreaker ErrorCircuitBreaker
methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker
lock sync.RWMutex
}

func NewErrorLatencyChecker(
func NewErrorChecker(
polsar88 marked this conversation as resolved.
Show resolved Hide resolved
upstreamConfig *conf.UpstreamConfig,
routingConfig *conf.RoutingConfig,
clientGetter client.EthClientGetter,
metricsContainer *metrics.Container,
logger *zap.Logger,
) types.ErrorLatencyChecker {
c := &ErrorLatencyCheck{
upstreamConfig: upstreamConfig,
routingConfig: routingConfig,
clientGetter: clientGetter,
metricsContainer: metricsContainer,
logger: logger,
errorCircuitBreaker: NewErrorStats(routingConfig),
methodLatencyBreaker: make(map[string]LatencyCircuitBreaker),
ShouldRunPassiveHealthChecks: routingConfig.PassiveLatencyChecking && (routingConfig.Errors != nil || routingConfig.Latency != nil),
}

if err := c.InitializePassiveCheck(); err != nil {
logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig), zap.Error(err))
}

return c
return NewErrorLatencyChecker(
upstreamConfig,
routingConfig,
clientGetter,
metricsContainer,
logger,
true,
false,
)
}

func (c *ErrorLatencyCheck) InitializePassiveCheck() error {
if !c.ShouldRunPassiveHealthChecks {
return nil
}

c.logger.Debug("Initializing ErrorLatencyCheck.", zap.Any("config", c.upstreamConfig))

httpClient, err := c.clientGetter(c.upstreamConfig.HTTPURL, &c.upstreamConfig.BasicAuthConfig, &c.upstreamConfig.RequestHeadersConfig)
if err != nil {
c.Err = err
return c.Err
}

c.client = httpClient

c.runPassiveCheck()

// TODO(polsar): This check is in both PeerCheck and SyncingCheck implementations, so refactor this.
if isMethodNotSupportedErr(c.Err) {
c.logger.Debug("ErrorLatencyCheck is not supported by upstream, not running check.", zap.String("upstreamID", c.upstreamConfig.ID))

// Turn off the passive health check for this upstream.
c.ShouldRunPassiveHealthChecks = false
}

return nil
func NewLatencyChecker(
upstreamConfig *conf.UpstreamConfig,
routingConfig *conf.RoutingConfig,
clientGetter client.EthClientGetter,
metricsContainer *metrics.Container,
logger *zap.Logger,
) types.ErrorLatencyChecker {
return NewErrorLatencyChecker(
upstreamConfig,
routingConfig,
clientGetter,
metricsContainer,
logger,
false,
true,
)
}

func (c *ErrorLatencyCheck) RunPassiveCheck() {
if !c.ShouldRunPassiveHealthChecks {
return
func NewErrorLatencyChecker(
upstreamConfig *conf.UpstreamConfig,
routingConfig *conf.RoutingConfig,
clientGetter client.EthClientGetter,
metricsContainer *metrics.Container,
logger *zap.Logger,
enableErrorChecking bool,
enableLatencyChecking bool,
) types.ErrorLatencyChecker {
if !enableErrorChecking && !enableLatencyChecking {
panic("ErrorLatencyCheck must have at least one of error or latency checking enabled.")
}

if c.client == nil {
if err := c.InitializePassiveCheck(); err != nil {
c.logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Error(err))
c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
metrics.HTTPInit,
conf.PassiveLatencyCheckMethod,
).Inc()
}
// Create the error circuit breaker if error checking is enabled.
var errorCircuitBreaker ErrorCircuitBreaker
if enableErrorChecking {
errorCircuitBreaker = NewErrorStats(routingConfig)
}

c.runPassiveCheck()
}

func (c *ErrorLatencyCheck) runPassiveCheck() {
if c.client == nil || !c.routingConfig.PassiveLatencyChecking {
return
// Create the latency circuit breaker if latency checking is enabled.
var latencyCircuitBreaker map[string]LatencyCircuitBreaker
if enableLatencyChecking {
latencyCircuitBreaker = make(map[string]LatencyCircuitBreaker)
}

latencyConfig := c.routingConfig.Latency
if latencyConfig == nil {
return
}

var wg sync.WaitGroup
defer wg.Wait()

// Iterate over all (method, latencyThreshold) pairs and launch the check for each in parallel.
// Note that `latencyConfig.MethodLatencyThresholds` is never modified after its initialization
// in `config` package, so we don't need a lock to protect concurrent read access.
for method, latencyThreshold := range latencyConfig.MethodLatencyThresholds {
wg.Add(1)

// Passing the loop variables as arguments is required to prevent the following lint error:
// loopclosure: loop variable method captured by func literal (govet)
go func(method string, latencyThreshold time.Duration) {
defer wg.Done()

runCheck := func() {
c.runPassiveCheckForMethod(method, latencyThreshold)
}

runCheckWithMetrics(runCheck,
c.metricsContainer.ErrorLatencyCheckRequests.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod),
c.metricsContainer.ErrorLatencyCheckDuration.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod))
}(method, latencyThreshold)
return &ErrorLatencyCheck{
upstreamConfig: upstreamConfig,
routingConfig: routingConfig,
clientGetter: clientGetter,
metricsContainer: metricsContainer,
logger: logger,
errorCircuitBreaker: errorCircuitBreaker,
methodLatencyBreaker: latencyCircuitBreaker,
}
}

Expand All @@ -268,66 +234,30 @@ func (c *ErrorLatencyCheck) getLatencyCircuitBreaker(method string) LatencyCircu
return stats
}

// This method runs the passive latency check for the specified method and latency threshold.
func (c *ErrorLatencyCheck) runPassiveCheckForMethod(method string, latencyThreshold time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), RPCRequestTimeout)
defer cancel()

latencyBreaker := c.getLatencyCircuitBreaker(method)

// Make and record the request.
var duration time.Duration
duration, c.Err = c.client.RecordLatency(ctx, method)
// TODO(polsar): The error must also pass the checks specified in the config
// (i.e. match HTTP code, JSON RPC code, and error message).
// Fixing this is not a priority since we're not currently using passive health checking.
isError := c.Err != nil
c.errorCircuitBreaker.RecordResponse(isError)
latencyBreaker.RecordLatency(duration)

if isError {
c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
metrics.HTTPRequest,
method,
).Inc()
} else if duration >= latencyThreshold {
c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
metrics.HTTPRequest,
method,
).Inc()
}

c.metricsContainer.ErrorLatency.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
method,
).Set(float64(duration.Milliseconds()))

c.logger.Debug("Ran passive ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Any("latency", duration), zap.Error(c.Err))
}

func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyReason {
// IsPassing
// TODO(polsar): Split this method into two separate methods: IsPassingError and IsPassingLatency.
func (c *ErrorLatencyCheck) IsPassing(methods []string) bool {
if !c.routingConfig.IsEnhancedRoutingControlDefined() {
return conf.ReasonUnknownOrHealthy
return true
}

if c.errorCircuitBreaker.IsOpen() {
if c.errorCircuitBreaker != nil && c.errorCircuitBreaker.IsOpen() {
c.logger.Debug(
"ErrorLatencyCheck is not passing due to too many errors.",
zap.String("upstreamID", c.upstreamConfig.ID),
zap.Error(c.Err),
)

return conf.ReasonErrorRate
return false
}

c.lock.Lock()
defer c.lock.Unlock()

if c.methodLatencyBreaker == nil {
return true
}

// Only consider the passed methods, even if other methods' circuit breakers might be open.
//
// TODO(polsar): If the circuit breaker for any of the passed methods is open, we consider this upstream
Expand All @@ -343,32 +273,38 @@ func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyR
zap.Error(c.Err),
)

return conf.ReasonLatencyTooHighRate
return false
}
}

return conf.ReasonUnknownOrHealthy
return true
}

// RecordRequest
// TODO(polsar): Split this method into two separate methods: RecordError and RecordLatency.
func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) {
if c.routingConfig.PassiveLatencyChecking {
return
}

if !c.routingConfig.IsEnhancedRoutingControlDefined() {
return
}

latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method)
latencyCircuitBreaker.RecordLatency(data.Latency)
// Record the request latency if latency checking is enabled.
if c.methodLatencyBreaker != nil {
latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method)
latencyCircuitBreaker.RecordLatency(data.Latency)

if data.Latency >= latencyCircuitBreaker.GetThreshold() {
c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
metrics.HTTPRequest,
data.Method,
).Inc()
if data.Latency >= latencyCircuitBreaker.GetThreshold() {
c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues(
c.upstreamConfig.ID,
c.upstreamConfig.HTTPURL,
metrics.HTTPRequest,
data.Method,
).Inc()
}
}

// If error checking is disabled, we can return early.
if c.errorCircuitBreaker == nil {
return
}

errorString := ""
Expand Down
Loading
Loading