From ac106a17ba2166519c24408743d8404dc6bf4a49 Mon Sep 17 00:00:00 2001 From: polsar88 Date: Mon, 7 Oct 2024 13:55:24 -0700 Subject: [PATCH 1/3] feat: implementation of `AlwaysRouteFilteringStrategy` (#140) * fix: remove a TODO * fix: field alignment * fix: field alignment * fix: readability * fix: update `README.md` * feat: initial implementation of `AlwaysRouteRoutingStrategy` * fix: comment formatting * fix: comment phrasing * fix: add a TODO * fix: renaming * fix: move `getFilterTypeName` function to `node_filter` module * refactor: deduplicate filter upstreams code --- README.md | 10 + .../route/always_route_filtering_strategy.go | 70 +++++++ .../always_route_filtering_strategy_test.go | 66 +++++++ internal/route/filtering_strategy.go | 22 ++- internal/route/node_filter.go | 29 ++- internal/route/node_filter_test.go | 172 +++++++++++++++--- internal/server/object_graph.go | 2 - 7 files changed, 337 insertions(+), 34 deletions(-) create mode 100644 internal/route/always_route_filtering_strategy.go create mode 100644 internal/route/always_route_filtering_strategy_test.go diff --git a/README.md b/README.md index 3e7c4def..9dc066a2 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,16 @@ Running all tests that match regexp `TestHealthCheckManager`: go test -v -run TestHealthCheckManager ./... ``` +Running all tests in a specific package that match regexp `TestHealthCheckManager`: +```sh +go test -v github.com/satsuma-data/node-gateway/internal/route -run 'Test_RemoveFilters*' +``` + +Running a specific test: +```sh +go test -v github.com/satsuma-data/node-gateway/internal/route -run Test_RemoveFilters_RemoveNone +``` + To measure test code coverage, install the following tool and run the above `go test` command with the `-cover` flag: ```sh go get golang.org/x/tools/cmd/cover diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go new file mode 100644 index 00000000..1c9a01f0 --- /dev/null +++ b/internal/route/always_route_filtering_strategy.go @@ -0,0 +1,70 @@ +package route + +import ( + "github.com/satsuma-data/node-gateway/internal/metadata" + "github.com/satsuma-data/node-gateway/internal/types" + "go.uber.org/zap" +) + +type AlwaysRouteFilteringStrategy struct { + BackingStrategy RoutingStrategy + Logger *zap.Logger + NodeFilters []NodeFilter + RemovableFilters []NodeFilterType +} + +func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, +) (string, error) { + // Create a copy of removable filters to avoid modifying the original slice. + removableFilters := make([]NodeFilterType, len(s.RemovableFilters)) + copy(removableFilters, s.RemovableFilters) + + filters := s.NodeFilters // WARNING: Slice assignment, not a copy! + for len(filters) > 0 { + // Get all healthy upstreams for the current set of filters. + upstreams := filterUpstreams( + upstreamsByPriority, + requestMetadata, + filters, + s.Logger, + ) + + // If there is at least one healthy upstream, route using the backing strategy. + if len(upstreams) > 0 { + return s.BackingStrategy.RouteNextRequest(upstreams, requestMetadata) + } + + // There are no more filters to remove and no healthy upstreams found, so give up. + if len(removableFilters) == 0 { + break + } + + // Remove the next filter and try again. + idx := len(removableFilters) - 1 + nextFilterToRemove := removableFilters[idx] + removableFilters = removableFilters[:idx] + filters = removeFilters(filters, nextFilterToRemove) + } + + // If all removable filters are exhausted and no healthy upstreams are found, + // pass in the original list of upstreams to the backing strategy. + // + // TODO(polsar): Eventually, we want all filters to be removable. Once that is the case, + // we should not be able to get here. + return s.BackingStrategy.RouteNextRequest(upstreamsByPriority, requestMetadata) +} + +// removeFilters returns a new slice of filters with the given filter type removed. +func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFilter { + retFilters := make([]NodeFilter, 0) + + for _, filter := range filters { + if getFilterTypeName(filter) != filterToRemove { + retFilters = append(retFilters, filter) + } + } + + return retFilters +} diff --git a/internal/route/always_route_filtering_strategy_test.go b/internal/route/always_route_filtering_strategy_test.go new file mode 100644 index 00000000..1e20efd9 --- /dev/null +++ b/internal/route/always_route_filtering_strategy_test.go @@ -0,0 +1,66 @@ +package route + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_RemoveFilters_RemoveNone(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysFail{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}, &AndFilter{}}, + removeFilters([]NodeFilter{AlwaysFail{}, &AndFilter{}}, "AlwaysPass"), + ) +} + +func Test_RemoveFilters_RemoveOne(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysFail{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{&AndFilter{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysPass{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}}, "AndFilter"), + ) +} + +func Test_RemoveFilters_RemoveTwo(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{AlwaysPass{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{&AndFilter{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysPass{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}, &AndFilter{}}, "AndFilter"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}, "AlwaysPass"), + ) +} diff --git a/internal/route/filtering_strategy.go b/internal/route/filtering_strategy.go index 0c33b517..a365264b 100644 --- a/internal/route/filtering_strategy.go +++ b/internal/route/filtering_strategy.go @@ -24,16 +24,34 @@ func (s *FilteringRoutingStrategy) RouteNextRequest( func (s *FilteringRoutingStrategy) filter( upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata, +) types.PriorityToUpstreamsMap { + return filterUpstreams( + upstreamsByPriority, + requestMetadata, + []NodeFilter{s.NodeFilter}, + s.Logger, + ) +} + +func filterUpstreams( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, + nodeFilters []NodeFilter, + logger *zap.Logger, ) types.PriorityToUpstreamsMap { priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap) + nodeFilter := AndFilter{ + filters: nodeFilters, + logger: logger, + } for priority, upstreamConfigs := range upstreamsByPriority { - s.Logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs)) + logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs)) filteredUpstreams := make([]*config.UpstreamConfig, 0) for _, upstreamConfig := range upstreamConfigs { - ok := s.NodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) + ok := nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) if ok { filteredUpstreams = append(filteredUpstreams, upstreamConfig) } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 61f2acc8..01d1bd5c 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -1,6 +1,9 @@ package route import ( + "reflect" + "strings" + "github.com/satsuma-data/node-gateway/internal/checks" "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" @@ -10,7 +13,11 @@ import ( const DefaultMaxBlocksBehind = 10 type NodeFilter interface { - Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, numUpstreamsInPriorityGroup int) bool + Apply( + requestMetadata metadata.RequestMetadata, + upstreamConfig *config.UpstreamConfig, + numUpstreamsInPriorityGroup int, + ) bool } type AndFilter struct { @@ -334,3 +341,23 @@ const ( MaxHeightForGroup NodeFilterType = "maxHeightForGroup" MethodsAllowed NodeFilterType = "methodsAllowed" ) + +func getFilterTypeName(v interface{}) NodeFilterType { + t := reflect.TypeOf(v) + + // If it's a pointer, get the element type. + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + // Extract the name of the type and remove the package path. + typeName := t.String() + lastDotIndex := strings.LastIndex(typeName, ".") + + if lastDotIndex != -1 { + // Remove the package path, keep only the type name. + typeName = typeName[lastDotIndex+1:] + } + + return NodeFilterType(typeName) +} diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 462211cc..38bb9839 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -33,24 +33,49 @@ func TestAndFilter_Apply(t *testing.T) { filters []NodeFilter } - type argsType struct { //nolint:govet // field alignment doesn't matter in tests - requestMetadata metadata.RequestMetadata + type argsType struct { upstreamConfig *config.UpstreamConfig + requestMetadata metadata.RequestMetadata } args := argsType{upstreamConfig: cfg("test-node")} - tests := []struct { //nolint:govet // field alignment doesn't matter in tests + tests := []struct { + args argsType name string fields fields - args argsType want bool }{ - {"No filters", fields{}, args, true}, - {"One passing filter", fields{[]NodeFilter{AlwaysPass{}}}, args, true}, - {"Multiple passing filters", fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysPass{}}}, args, true}, - {"One failing filter", fields{[]NodeFilter{AlwaysFail{}}}, args, false}, - {"Multiple passing and one failing", fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}}, args, false}, + { + args, + "No filters", + fields{}, + true, + }, + { + args, + "One passing filter", + fields{[]NodeFilter{AlwaysPass{}}}, + true, + }, + { + args, + "Multiple passing filters", + fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysPass{}}}, + true, + }, + { + args, + "One failing filter", + fields{[]NodeFilter{AlwaysFail{}}}, + false, + }, + { + args, + "Multiple passing and one failing", + fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}}, + false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -58,7 +83,7 @@ func TestAndFilter_Apply(t *testing.T) { filters: tt.fields.filters, } ok := a.Apply(tt.args.requestMetadata, tt.args.upstreamConfig, 1) - assert.Equalf(t, tt.want, ok, "Apply(%v, %v)", tt.args.requestMetadata, tt.args.upstreamConfig, 1) + assert.Equalf(t, tt.want, ok, "Apply(%v, %v)", tt.args.requestMetadata, tt.args.upstreamConfig) }) } } @@ -174,32 +199,104 @@ func TestMethodsAllowedFilter_Apply(t *testing.T) { // Batch requests stateMethodsMetadata := metadata.RequestMetadata{Methods: []string{"eth_getBalance", "eth_getBlockNumber"}} - type args struct { //nolint:govet // field alignment doesn't matter in tests - requestMetadata metadata.RequestMetadata + type args struct { upstreamConfig *config.UpstreamConfig + requestMetadata metadata.RequestMetadata } - tests := []struct { //nolint:govet // field alignment doesn't matter in tests + tests := []struct { name string args args want bool }{ - {"stateMethodFullNode", args{stateMethodMetadata, &fullNodeConfig}, false}, - {"stateMethodFullNodeWithArchiveMethodEnabled", - args{stateMethodMetadata, &fullNodeConfigWithArchiveMethodEnabled}, true}, - {"stateMethodArchiveNode", args{stateMethodMetadata, &archiveNodeConfig}, true}, - {"stateMethodArchiveNodeWithMethodDisabled", - args{stateMethodMetadata, &archiveNodeConfigWithMethodDisabled}, false}, - {"nonStateMethodFullNode", args{nonStateMethodMetadata, &fullNodeConfig}, true}, - {"nonStateMethodFullNodeWithMethodDisabled", - args{nonStateMethodMetadata, &fullNodeConfigWithFullMethodDisabled}, false}, - {"nonStateMethodArchiveNode", args{nonStateMethodMetadata, &archiveNodeConfig}, true}, - {"batchRequestsStateMethodsFullNode", args{stateMethodsMetadata, &fullNodeConfig}, false}, - {"batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", - args{stateMethodsMetadata, &fullNodeConfigWithArchiveMethodEnabled}, true}, - {"batchRequestsStateMethodsArchiveNode", args{stateMethodsMetadata, &archiveNodeConfig}, true}, - {"batchRequestsStateMethodsArchiveNodeWithMethodDisabled", - args{stateMethodsMetadata, &archiveNodeConfigWithMethodDisabled}, false}, + { + "stateMethodFullNode", + args{ + &fullNodeConfig, + stateMethodMetadata, + }, + false, + }, + { + "stateMethodFullNodeWithArchiveMethodEnabled", + args{ + &fullNodeConfigWithArchiveMethodEnabled, + stateMethodMetadata, + }, + true, + }, + { + "stateMethodArchiveNode", + args{ + &archiveNodeConfig, + stateMethodMetadata, + }, + true, + }, + { + "stateMethodArchiveNodeWithMethodDisabled", + args{ + &archiveNodeConfigWithMethodDisabled, + stateMethodMetadata, + }, + false, + }, + { + "nonStateMethodFullNode", + args{ + &fullNodeConfig, + nonStateMethodMetadata, + }, + true, + }, + { + "nonStateMethodFullNodeWithMethodDisabled", + args{ + &fullNodeConfigWithFullMethodDisabled, + nonStateMethodMetadata, + }, + false, + }, + { + "nonStateMethodArchiveNode", + args{ + &archiveNodeConfig, + nonStateMethodMetadata, + }, + true, + }, + { + "batchRequestsStateMethodsFullNode", + args{ + &fullNodeConfig, + stateMethodsMetadata, + }, + false, + }, + { + "batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", + args{ + &fullNodeConfigWithArchiveMethodEnabled, + stateMethodsMetadata, + }, + true, + }, + { + "batchRequestsStateMethodsArchiveNode", + args{ + &archiveNodeConfig, + stateMethodsMetadata, + }, + true, + }, + { + "batchRequestsStateMethodsArchiveNodeWithMethodDisabled", + args{ + &archiveNodeConfigWithMethodDisabled, + stateMethodsMetadata, + }, + false, + }, } for _, tt := range tests { @@ -218,3 +315,20 @@ func emitBlockHeight(store *metadata.ChainMetadataStore, groupID, upstreamID str func emitError(store *metadata.ChainMetadataStore, groupID, upstreamID string, err error) { store.ProcessErrorUpdate(groupID, upstreamID, err) } + +func Test_getFilterTypeName(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + NodeFilterType("AlwaysPass"), + getFilterTypeName(AlwaysPass{}), + ) + Assert.Equal( + NodeFilterType("AlwaysFail"), + getFilterTypeName(AlwaysFail{}), + ) + Assert.Equal( + NodeFilterType("AndFilter"), + getFilterTypeName(&AndFilter{}), + ) +} diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index 68a56224..9839362f 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -52,8 +52,6 @@ func wireSingleChainDependencies( alwaysRoute = *chainConfig.Routing.AlwaysRoute } - // TODO(polsar): Here, the HealthCheckManager is wired into the primary FilteringRoutingStrategy. - // We may need to wire it into the secondary PriorityRoundRobinStrategy as well. enabledNodeFilters := []route.NodeFilterType{ route.Healthy, route.MaxHeightForGroup, From 6829d5eaf8931ce8bcb5467bbf4f3b77351e87ea Mon Sep 17 00:00:00 2001 From: polsar88 Date: Mon, 7 Oct 2024 14:05:32 -0700 Subject: [PATCH 2/3] feat: use `AlwaysRouteFilteringStrategy` if `alwaysRoute` is enabled (#141) * fix: remove a TODO * fix: field alignment * fix: field alignment * fix: readability * fix: update `README.md` * feat: initial implementation of `AlwaysRouteRoutingStrategy` * fix: comment formatting * fix: comment phrasing * fix: add a TODO * fix: renaming * fix: revert to the previous version of `PriorityRoundRobinStrategy` * fix: remove `HealthStatus` field * feat: LatencyChecker can now be configured to only check latency, errors, or both * feat: create separate instances of LatencyChecker for error and latency checking * feat: plumbing for `AlwaysRouteFilteringStrategy` * fix: tests * fix: removable filters * feat: add debug logging * fix: move `getFilterTypeName` function to `node_filter` module * refactor: deduplicate filter upstreams code * refactor: remove passive error/latency checker code --- internal/checks/checks_test.go | 5 +- internal/checks/errorlatency.go | 260 +++++++----------- internal/checks/errorlatency_test.go | 53 ---- internal/checks/manager.go | 43 ++- internal/config/config.go | 60 +--- internal/mocks/ErrorLatencyChecker.go | 64 +---- .../route/always_route_filtering_strategy.go | 8 +- internal/route/node_filter.go | 44 ++- internal/route/node_filter_test.go | 8 +- internal/route/routing_strategy.go | 133 +-------- internal/route/routing_strategy_test.go | 6 +- internal/server/object_graph.go | 50 +++- internal/server/web_server_e2e_test.go | 15 +- internal/types/types.go | 4 +- 14 files changed, 256 insertions(+), 497 deletions(-) diff --git a/internal/checks/checks_test.go b/internal/checks/checks_test.go index dd501d2c..3bd1a868 100644 --- a/internal/checks/checks_test.go +++ b/internal/checks/checks_test.go @@ -16,9 +16,8 @@ var defaultUpstreamConfig = &config.UpstreamConfig{ } var defaultRoutingConfig = &config.RoutingConfig{ - PassiveLatencyChecking: true, - DetectionWindow: config.NewDuration(10 * time.Minute), - BanWindow: config.NewDuration(50 * time.Minute), + DetectionWindow: config.NewDuration(10 * time.Minute), + BanWindow: config.NewDuration(50 * time.Minute), Errors: &config.ErrorsConfig{ Rate: 0.25, HTTPCodes: []string{ diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index 44aa1393..ab50c959 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -1,7 +1,6 @@ package checks import ( - "context" "math" "net/http" "strconv" @@ -129,125 +128,92 @@ func NewCircuitBreaker( Build() } +// ErrorLatencyCheck +// Error checking is disabled if `errorCircuitBreaker` is nil. +// Latency checking is disabled if `methodLatencyBreaker` is nil. +// At least one of these two must be non-nil. type ErrorLatencyCheck struct { - client client.EthClient - Err error - clientGetter client.EthClientGetter - metricsContainer *metrics.Container - logger *zap.Logger - upstreamConfig *conf.UpstreamConfig - routingConfig *conf.RoutingConfig - errorCircuitBreaker ErrorCircuitBreaker - methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker - lock sync.RWMutex - ShouldRunPassiveHealthChecks bool + client client.EthClient + Err error + clientGetter client.EthClientGetter + metricsContainer *metrics.Container + logger *zap.Logger + upstreamConfig *conf.UpstreamConfig + routingConfig *conf.RoutingConfig + errorCircuitBreaker ErrorCircuitBreaker + methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker + lock sync.RWMutex } -func NewErrorLatencyChecker( +func NewErrorChecker( upstreamConfig *conf.UpstreamConfig, routingConfig *conf.RoutingConfig, clientGetter client.EthClientGetter, metricsContainer *metrics.Container, logger *zap.Logger, ) types.ErrorLatencyChecker { - c := &ErrorLatencyCheck{ - upstreamConfig: upstreamConfig, - routingConfig: routingConfig, - clientGetter: clientGetter, - metricsContainer: metricsContainer, - logger: logger, - errorCircuitBreaker: NewErrorStats(routingConfig), - methodLatencyBreaker: make(map[string]LatencyCircuitBreaker), - ShouldRunPassiveHealthChecks: routingConfig.PassiveLatencyChecking && (routingConfig.Errors != nil || routingConfig.Latency != nil), - } - - if err := c.InitializePassiveCheck(); err != nil { - logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig), zap.Error(err)) - } - - return c + return NewErrorLatencyChecker( + upstreamConfig, + routingConfig, + clientGetter, + metricsContainer, + logger, + true, + false, + ) } -func (c *ErrorLatencyCheck) InitializePassiveCheck() error { - if !c.ShouldRunPassiveHealthChecks { - return nil - } - - c.logger.Debug("Initializing ErrorLatencyCheck.", zap.Any("config", c.upstreamConfig)) - - httpClient, err := c.clientGetter(c.upstreamConfig.HTTPURL, &c.upstreamConfig.BasicAuthConfig, &c.upstreamConfig.RequestHeadersConfig) - if err != nil { - c.Err = err - return c.Err - } - - c.client = httpClient - - c.runPassiveCheck() - - // TODO(polsar): This check is in both PeerCheck and SyncingCheck implementations, so refactor this. - if isMethodNotSupportedErr(c.Err) { - c.logger.Debug("ErrorLatencyCheck is not supported by upstream, not running check.", zap.String("upstreamID", c.upstreamConfig.ID)) - - // Turn off the passive health check for this upstream. - c.ShouldRunPassiveHealthChecks = false - } - - return nil +func NewLatencyChecker( + upstreamConfig *conf.UpstreamConfig, + routingConfig *conf.RoutingConfig, + clientGetter client.EthClientGetter, + metricsContainer *metrics.Container, + logger *zap.Logger, +) types.ErrorLatencyChecker { + return NewErrorLatencyChecker( + upstreamConfig, + routingConfig, + clientGetter, + metricsContainer, + logger, + false, + true, + ) } -func (c *ErrorLatencyCheck) RunPassiveCheck() { - if !c.ShouldRunPassiveHealthChecks { - return +func NewErrorLatencyChecker( + upstreamConfig *conf.UpstreamConfig, + routingConfig *conf.RoutingConfig, + clientGetter client.EthClientGetter, + metricsContainer *metrics.Container, + logger *zap.Logger, + enableErrorChecking bool, + enableLatencyChecking bool, +) types.ErrorLatencyChecker { + if !enableErrorChecking && !enableLatencyChecking { + panic("ErrorLatencyCheck must have at least one of error or latency checking enabled.") } - if c.client == nil { - if err := c.InitializePassiveCheck(); err != nil { - c.logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Error(err)) - c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPInit, - conf.PassiveLatencyCheckMethod, - ).Inc() - } + // Create the error circuit breaker if error checking is enabled. + var errorCircuitBreaker ErrorCircuitBreaker + if enableErrorChecking { + errorCircuitBreaker = NewErrorStats(routingConfig) } - c.runPassiveCheck() -} - -func (c *ErrorLatencyCheck) runPassiveCheck() { - if c.client == nil || !c.routingConfig.PassiveLatencyChecking { - return + // Create the latency circuit breaker if latency checking is enabled. + var latencyCircuitBreaker map[string]LatencyCircuitBreaker + if enableLatencyChecking { + latencyCircuitBreaker = make(map[string]LatencyCircuitBreaker) } - latencyConfig := c.routingConfig.Latency - if latencyConfig == nil { - return - } - - var wg sync.WaitGroup - defer wg.Wait() - - // Iterate over all (method, latencyThreshold) pairs and launch the check for each in parallel. - // Note that `latencyConfig.MethodLatencyThresholds` is never modified after its initialization - // in `config` package, so we don't need a lock to protect concurrent read access. - for method, latencyThreshold := range latencyConfig.MethodLatencyThresholds { - wg.Add(1) - - // Passing the loop variables as arguments is required to prevent the following lint error: - // loopclosure: loop variable method captured by func literal (govet) - go func(method string, latencyThreshold time.Duration) { - defer wg.Done() - - runCheck := func() { - c.runPassiveCheckForMethod(method, latencyThreshold) - } - - runCheckWithMetrics(runCheck, - c.metricsContainer.ErrorLatencyCheckRequests.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod), - c.metricsContainer.ErrorLatencyCheckDuration.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod)) - }(method, latencyThreshold) + return &ErrorLatencyCheck{ + upstreamConfig: upstreamConfig, + routingConfig: routingConfig, + clientGetter: clientGetter, + metricsContainer: metricsContainer, + logger: logger, + errorCircuitBreaker: errorCircuitBreaker, + methodLatencyBreaker: latencyCircuitBreaker, } } @@ -268,66 +234,30 @@ func (c *ErrorLatencyCheck) getLatencyCircuitBreaker(method string) LatencyCircu return stats } -// This method runs the passive latency check for the specified method and latency threshold. -func (c *ErrorLatencyCheck) runPassiveCheckForMethod(method string, latencyThreshold time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), RPCRequestTimeout) - defer cancel() - - latencyBreaker := c.getLatencyCircuitBreaker(method) - - // Make and record the request. - var duration time.Duration - duration, c.Err = c.client.RecordLatency(ctx, method) - // TODO(polsar): The error must also pass the checks specified in the config - // (i.e. match HTTP code, JSON RPC code, and error message). - // Fixing this is not a priority since we're not currently using passive health checking. - isError := c.Err != nil - c.errorCircuitBreaker.RecordResponse(isError) - latencyBreaker.RecordLatency(duration) - - if isError { - c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - method, - ).Inc() - } else if duration >= latencyThreshold { - c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - method, - ).Inc() - } - - c.metricsContainer.ErrorLatency.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - method, - ).Set(float64(duration.Milliseconds())) - - c.logger.Debug("Ran passive ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Any("latency", duration), zap.Error(c.Err)) -} - -func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyReason { +// IsPassing +// TODO(polsar): Split this method into two separate methods: IsPassingError and IsPassingLatency. +func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { if !c.routingConfig.IsEnhancedRoutingControlDefined() { - return conf.ReasonUnknownOrHealthy + return true } - if c.errorCircuitBreaker.IsOpen() { + if c.errorCircuitBreaker != nil && c.errorCircuitBreaker.IsOpen() { c.logger.Debug( "ErrorLatencyCheck is not passing due to too many errors.", zap.String("upstreamID", c.upstreamConfig.ID), zap.Error(c.Err), ) - return conf.ReasonErrorRate + return false } c.lock.Lock() defer c.lock.Unlock() + if c.methodLatencyBreaker == nil { + return true + } + // Only consider the passed methods, even if other methods' circuit breakers might be open. // // TODO(polsar): If the circuit breaker for any of the passed methods is open, we consider this upstream @@ -343,32 +273,38 @@ func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyR zap.Error(c.Err), ) - return conf.ReasonLatencyTooHighRate + return false } } - return conf.ReasonUnknownOrHealthy + return true } +// RecordRequest +// TODO(polsar): Split this method into two separate methods: RecordError and RecordLatency. func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { - if c.routingConfig.PassiveLatencyChecking { - return - } - if !c.routingConfig.IsEnhancedRoutingControlDefined() { return } - latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) - latencyCircuitBreaker.RecordLatency(data.Latency) + // Record the request latency if latency checking is enabled. + if c.methodLatencyBreaker != nil { + latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) + latencyCircuitBreaker.RecordLatency(data.Latency) - if data.Latency >= latencyCircuitBreaker.GetThreshold() { - c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - data.Method, - ).Inc() + if data.Latency >= latencyCircuitBreaker.GetThreshold() { + c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + metrics.HTTPRequest, + data.Method, + ).Inc() + } + } + + // If error checking is disabled, we can return early. + if c.errorCircuitBreaker == nil { + return } errorString := "" diff --git a/internal/checks/errorlatency_test.go b/internal/checks/errorlatency_test.go index 735250d0..de89ee5c 100644 --- a/internal/checks/errorlatency_test.go +++ b/internal/checks/errorlatency_test.go @@ -2,63 +2,10 @@ package checks import ( "testing" - "time" - "github.com/satsuma-data/node-gateway/internal/client" - "github.com/satsuma-data/node-gateway/internal/config" - "github.com/satsuma-data/node-gateway/internal/metrics" - "github.com/satsuma-data/node-gateway/internal/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "go.uber.org/zap" ) -func helperTestLatencyChecker(t *testing.T, latency1, latency2 time.Duration, reason config.UnhealthyReason) { - t.Helper() - - methods := []string{"eth_call", "eth_getLogs"} - - ethClient := mocks.NewEthClient(t) - ethClient.EXPECT().RecordLatency(mock.Anything, methods[0]).Return(latency1, nil) - ethClient.EXPECT().RecordLatency(mock.Anything, methods[1]).Return(latency2, nil) - - mockEthClientGetter := func(url string, credentials *config.BasicAuthConfig, additionalRequestHeaders *[]config.RequestHeaderConfig) (client.EthClient, error) { //nolint:nolintlint,revive // Legacy - return ethClient, nil - } - - checker := NewErrorLatencyChecker( - defaultUpstreamConfig, - defaultRoutingConfig, - mockEthClientGetter, - metrics.NewContainer(config.TestChainName), - zap.L(), - ) - - assert.Equal(t, reason, checker.GetUnhealthyReason(methods)) - - ethClient.AssertNumberOfCalls(t, "RecordLatency", 2) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesLessThanThreshold(t *testing.T) { - helperTestLatencyChecker(t, 2*time.Millisecond, 3*time.Millisecond, config.ReasonUnknownOrHealthy) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesJustBelowThreshold(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, (2000-1)*time.Millisecond, config.ReasonUnknownOrHealthy) -} - -func TestLatencyChecker_TwoMethods_FirstLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10000*time.Millisecond, (2000-1)*time.Millisecond, config.ReasonLatencyTooHighRate) -} - -func TestLatencyChecker_TwoMethods_SecondLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, 2000*time.Millisecond, config.ReasonLatencyTooHighRate) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10002*time.Millisecond, 2003*time.Millisecond, config.ReasonLatencyTooHighRate) -} - func Test_isMatchForPatterns_True(t *testing.T) { Assert := assert.New(t) diff --git a/internal/checks/manager.go b/internal/checks/manager.go index c0c420ff..18ce65f9 100644 --- a/internal/checks/manager.go +++ b/internal/checks/manager.go @@ -56,6 +56,13 @@ type healthCheckManager struct { *metrics.Container, *zap.Logger, ) types.Checker + newErrorCheck func( + *conf.UpstreamConfig, + *conf.RoutingConfig, + client.EthClientGetter, + *metrics.Container, + *zap.Logger, + ) types.ErrorLatencyChecker newLatencyCheck func( *conf.UpstreamConfig, *conf.RoutingConfig, @@ -92,7 +99,8 @@ func NewHealthCheckManager( newBlockHeightCheck: NewBlockHeightChecker, newPeerCheck: NewPeerChecker, newSyncingCheck: NewSyncingChecker, - newLatencyCheck: NewErrorLatencyChecker, + newErrorCheck: NewErrorChecker, + newLatencyCheck: NewLatencyChecker, blockHeightObserver: blockHeightObserver, healthCheckTicker: healthCheckTicker, metricsContainer: metricsContainer, @@ -118,7 +126,16 @@ func (h *healthCheckManager) GetUpstreamStatus(upstreamID string) *types.Upstrea panic(fmt.Sprintf("Upstream ID %s not found!", upstreamID)) } +func (h *healthCheckManager) GetErrorCheck(upstreamID string) types.ErrorLatencyChecker { + return h.GetUpstreamStatus(upstreamID).ErrorCheck +} + +func (h *healthCheckManager) GetLatencyCheck(upstreamID string) types.ErrorLatencyChecker { + return h.GetUpstreamStatus(upstreamID).LatencyCheck +} + func (h *healthCheckManager) RecordRequest(upstreamID string, data *types.RequestData) { + h.GetUpstreamStatus(upstreamID).ErrorCheck.RecordRequest(data) h.GetUpstreamStatus(upstreamID).LatencyCheck.RecordRequest(data) } @@ -188,6 +205,22 @@ func (h *healthCheckManager) initializeChecks() { ) }() + var errorCheck types.ErrorLatencyChecker + + innerWG.Add(1) + + go func() { + defer innerWG.Done() + + errorCheck = h.newErrorCheck( + &config, + &h.routingConfig, + client.NewEthClient, + h.metricsContainer, + h.logger, + ) + }() + var latencyCheck types.ErrorLatencyChecker innerWG.Add(1) @@ -213,6 +246,7 @@ func (h *healthCheckManager) initializeChecks() { BlockHeightCheck: blockHeightCheck, PeerCheck: peerCheck, SyncingCheck: syncingCheck, + ErrorCheck: errorCheck, LatencyCheck: latencyCheck, }) mutex.Unlock() @@ -257,13 +291,6 @@ func (h *healthCheckManager) runChecksOnce() { defer wg.Done() c.RunCheck() }(h.GetUpstreamStatus(config.ID).SyncingCheck) - - wg.Add(1) - - go func(c types.ErrorLatencyChecker) { - defer wg.Done() - c.RunPassiveCheck() - }(h.GetUpstreamStatus(config.ID).LatencyCheck) } wg.Wait() diff --git a/internal/config/config.go b/internal/config/config.go index 0591ab78..d57aeb8d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,37 +14,15 @@ import ( type NodeType string const ( - DefaultBanWindow = 5 * time.Minute - DefaultDetectionWindow = time.Minute - // DefaultMaxLatency is used when the latency threshold is not specified in the config. - // TODO(polsar): We should probably use a lower value. - DefaultMaxLatency = 10 * time.Second + DefaultBanWindow = 5 * time.Minute + DefaultDetectionWindow = time.Minute + DefaultMaxLatency = 10 * time.Second // Default latency threshold DefaultErrorRate = 0.25 DefaultLatencyTooHighRate = 0.5 // TODO(polsar): Expose this parameter in the config. Archive NodeType = "archive" Full NodeType = "full" - // PassiveLatencyCheckMethod is a dummy method we use to measure the latency of an upstream RPC endpoint. - // https://docs.infura.io/api/networks/ethereum/json-rpc-methods/eth_chainid - PassiveLatencyCheckMethod = "eth_chainId" //nolint:gosec // No hardcoded credentials here. - // PassiveLatencyChecking indicates whether to use live (active) requests as data for the LatencyChecker (false) - // or synthetic (passive) periodic requests (true). - // TODO(polsar): This setting is currently not configurable via the YAML config file. - // TODO(polsar): We may also consider a hybrid request latency/error checking using both active and passive requests. - PassiveLatencyChecking = false ) -// UnhealthyReason is the reason why a health check failed. We use it to select the most appropriate upstream to route to -// if all upstreams are unhealthy and the `alwaysRoute` option is true. -type UnhealthyReason int - -const ( - ReasonUnknownOrHealthy = iota - ReasonErrorRate - ReasonLatencyTooHighRate -) - -// UpstreamConfig -// TODO(polsar): Move the HealthStatus field to a new struct and embed this struct in it. Asana task: https://app.asana.com/0/1207397277805097/1208232039997185/f type UpstreamConfig struct { Methods MethodsConfig `yaml:"methods"` HealthCheckConfig HealthCheckConfig `yaml:"healthCheck"` @@ -55,7 +33,6 @@ type UpstreamConfig struct { GroupID string `yaml:"group"` NodeType NodeType `yaml:"nodeType"` RequestHeadersConfig []RequestHeaderConfig `yaml:"requestHeaders"` - HealthStatus UnhealthyReason // The default value of this field is 0 (ReasonUnknownOrHealthy). } func (c *UpstreamConfig) isValid(groups []GroupConfig) bool { @@ -262,12 +239,7 @@ type MethodConfig struct { Threshold time.Duration `yaml:"threshold"` } -func (c *MethodConfig) isMethodConfigValid(passiveLatencyChecking bool) bool { - if passiveLatencyChecking { - // TODO(polsar): Validate the method name: https://ethereum.org/en/developers/docs/apis/json-rpc/ - return !strings.EqualFold(c.Name, PassiveLatencyCheckMethod) - } - +func (c *MethodConfig) isMethodConfigValid() bool { return true } @@ -358,13 +330,13 @@ func (c *LatencyConfig) initialize(globalConfig *RoutingConfig) { } } -func (c *LatencyConfig) isLatencyConfigValid(passiveLatencyChecking bool) bool { +func (c *LatencyConfig) isLatencyConfigValid() bool { if c == nil { return true } for _, method := range c.Methods { - if !method.isMethodConfigValid(passiveLatencyChecking) { + if !method.isMethodConfigValid() { return false } } @@ -373,14 +345,13 @@ func (c *LatencyConfig) isLatencyConfigValid(passiveLatencyChecking bool) bool { } type RoutingConfig struct { - AlwaysRoute *bool `yaml:"alwaysRoute"` - Errors *ErrorsConfig `yaml:"errors"` - Latency *LatencyConfig `yaml:"latency"` - DetectionWindow *time.Duration `yaml:"detectionWindow"` - BanWindow *time.Duration `yaml:"banWindow"` - MaxBlocksBehind int `yaml:"maxBlocksBehind"` - PassiveLatencyChecking bool - IsInitialized bool + AlwaysRoute *bool `yaml:"alwaysRoute"` + Errors *ErrorsConfig `yaml:"errors"` + Latency *LatencyConfig `yaml:"latency"` + DetectionWindow *time.Duration `yaml:"detectionWindow"` + BanWindow *time.Duration `yaml:"banWindow"` + MaxBlocksBehind int `yaml:"maxBlocksBehind"` + IsInitialized bool } // IsEnhancedRoutingControlDefined returns true iff any of the enhanced routing control fields are specified @@ -400,8 +371,6 @@ func (r *RoutingConfig) setDefaults(globalConfig *RoutingConfig, force bool) boo return true } - r.PassiveLatencyChecking = PassiveLatencyChecking - if !force && !r.IsEnhancedRoutingControlDefined() && (globalConfig == nil || !globalConfig.IsEnhancedRoutingControlDefined()) { // Routing config is not specified at either this or global level, so there is nothing to do. return false @@ -475,7 +444,7 @@ func (r *RoutingConfig) isRoutingConfigValid() bool { latency := r.Latency if latency != nil { - isValid = isValid && latency.isLatencyConfigValid(r.PassiveLatencyChecking) + isValid = isValid && latency.isLatencyConfigValid() } return isValid @@ -544,7 +513,6 @@ func (c *SingleChainConfig) isValid() bool { func (c *SingleChainConfig) setDefaults(globalConfig *GlobalConfig, isGlobalRoutingConfigSpecified bool) { if !isGlobalRoutingConfigSpecified && !c.Routing.IsEnhancedRoutingControlDefined() { - c.Routing.PassiveLatencyChecking = PassiveLatencyChecking return } diff --git a/internal/mocks/ErrorLatencyChecker.go b/internal/mocks/ErrorLatencyChecker.go index 312395b1..dcd8dfc7 100644 --- a/internal/mocks/ErrorLatencyChecker.go +++ b/internal/mocks/ErrorLatencyChecker.go @@ -3,10 +3,8 @@ package mocks import ( - config "github.com/satsuma-data/node-gateway/internal/config" - mock "github.com/stretchr/testify/mock" - types "github.com/satsuma-data/node-gateway/internal/types" + mock "github.com/stretchr/testify/mock" ) // ErrorLatencyChecker is an autogenerated mock type for the ErrorLatencyChecker type @@ -22,48 +20,48 @@ func (_m *ErrorLatencyChecker) EXPECT() *ErrorLatencyChecker_Expecter { return &ErrorLatencyChecker_Expecter{mock: &_m.Mock} } -// GetUnhealthyReason provides a mock function with given fields: methods -func (_m *ErrorLatencyChecker) GetUnhealthyReason(methods []string) config.UnhealthyReason { +// IsPassing provides a mock function with given fields: methods +func (_m *ErrorLatencyChecker) IsPassing(methods []string) bool { ret := _m.Called(methods) if len(ret) == 0 { - panic("no return value specified for GetUnhealthyReason") + panic("no return value specified for IsPassing") } - var r0 config.UnhealthyReason - if rf, ok := ret.Get(0).(func([]string) config.UnhealthyReason); ok { + var r0 bool + if rf, ok := ret.Get(0).(func([]string) bool); ok { r0 = rf(methods) } else { - r0 = ret.Get(0).(config.UnhealthyReason) + r0 = ret.Get(0).(bool) } return r0 } -// ErrorLatencyChecker_GetUnhealthyReason_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetUnhealthyReason' -type ErrorLatencyChecker_GetUnhealthyReason_Call struct { +// ErrorLatencyChecker_IsPassing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsPassing' +type ErrorLatencyChecker_IsPassing_Call struct { *mock.Call } -// GetUnhealthyReason is a helper method to define mock.On call +// IsPassing is a helper method to define mock.On call // - methods []string -func (_e *ErrorLatencyChecker_Expecter) GetUnhealthyReason(methods interface{}) *ErrorLatencyChecker_GetUnhealthyReason_Call { - return &ErrorLatencyChecker_GetUnhealthyReason_Call{Call: _e.mock.On("GetUnhealthyReason", methods)} +func (_e *ErrorLatencyChecker_Expecter) IsPassing(methods interface{}) *ErrorLatencyChecker_IsPassing_Call { + return &ErrorLatencyChecker_IsPassing_Call{Call: _e.mock.On("IsPassing", methods)} } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) Run(run func(methods []string)) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) Run(run func(methods []string)) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].([]string)) }) return _c } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) Return(_a0 config.UnhealthyReason) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) Return(_a0 bool) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Return(_a0) return _c } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) RunAndReturn(run func([]string) config.UnhealthyReason) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) RunAndReturn(run func([]string) bool) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Return(run) return _c } @@ -101,38 +99,6 @@ func (_c *ErrorLatencyChecker_RecordRequest_Call) RunAndReturn(run func(*types.R return _c } -// RunPassiveCheck provides a mock function with given fields: -func (_m *ErrorLatencyChecker) RunPassiveCheck() { - _m.Called() -} - -// ErrorLatencyChecker_RunPassiveCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunPassiveCheck' -type ErrorLatencyChecker_RunPassiveCheck_Call struct { - *mock.Call -} - -// RunPassiveCheck is a helper method to define mock.On call -func (_e *ErrorLatencyChecker_Expecter) RunPassiveCheck() *ErrorLatencyChecker_RunPassiveCheck_Call { - return &ErrorLatencyChecker_RunPassiveCheck_Call{Call: _e.mock.On("RunPassiveCheck")} -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) Run(run func()) *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) Return() *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Return() - return _c -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) RunAndReturn(run func()) *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Return(run) - return _c -} - // NewErrorLatencyChecker creates a new instance of ErrorLatencyChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewErrorLatencyChecker(t interface { diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 1c9a01f0..450c8114 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -46,6 +46,12 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( nextFilterToRemove := removableFilters[idx] removableFilters = removableFilters[:idx] filters = removeFilters(filters, nextFilterToRemove) + s.Logger.Debug( + "Removed a filter.", + zap.Any("upstreamsByPriority", upstreamsByPriority), + zap.Any("removedFilter", nextFilterToRemove), + zap.Any("remainingFilters", filters), + ) } // If all removable filters are exhausted and no healthy upstreams are found, @@ -61,7 +67,7 @@ func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFi retFilters := make([]NodeFilter, 0) for _, filter := range filters { - if getFilterTypeName(filter) != filterToRemove { + if GetFilterTypeName(filter) != filterToRemove { retFilters = append(retFilters, filter) } } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 01d1bd5c..5c7ac9e3 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -114,28 +114,26 @@ func (f *IsDoneSyncing) Apply(_ metadata.RequestMetadata, upstreamConfig *config return true } +type IsErrorRateAcceptable struct { + HealthCheckManager checks.HealthCheckManager +} + +func (f *IsErrorRateAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { + upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) + errorCheck, _ := upstreamStatus.ErrorCheck.(*checks.ErrorLatencyCheck) + + return errorCheck.IsPassing(requestMetadata.Methods) +} + type IsLatencyAcceptable struct { - healthCheckManager checks.HealthCheckManager - logger *zap.Logger + HealthCheckManager checks.HealthCheckManager } func (f *IsLatencyAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { - upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) - + upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) latencyCheck, _ := upstreamStatus.LatencyCheck.(*checks.ErrorLatencyCheck) - // TODO(polsar): If unhealthy, set the delta by which the check failed. - // For example, if the configured rate is 0.25 and the current error rate is 0.27, - // the delta is 0.27 - 0.25 = 0.02. This value can be used to rank upstreams by the - // degree to which they are unhealthy. This would help us choose the upstream to - // route to if all upstreams are unhealthy AND `alwaysRoute` config option is true. - upstreamConfig.HealthStatus = latencyCheck.GetUnhealthyReason(requestMetadata.Methods) - - // TODO(polsar): Note that for ErrorLatencyCheck only, we always return true. The health status - // of the upstream is instead contained in the struct's HealthStatus field. This is a bit - // clunky. Eventually, we want to change the signature of the Apply method. This will require - // significant refactoring. - return true + return latencyCheck.IsPassing(requestMetadata.Methods) } type IsCloseToGlobalMaxHeight struct { @@ -298,15 +296,9 @@ func CreateSingleNodeFilter( minimumPeerCount: checks.MinimumPeerCount, } - isLatencyAcceptable := IsLatencyAcceptable{ - healthCheckManager: manager, - logger: logger, - } - return &AndFilter{ filters: []NodeFilter{ &hasEnoughPeers, - &isLatencyAcceptable, }, logger: logger, } @@ -328,6 +320,10 @@ func CreateSingleNodeFilter( } case MethodsAllowed: return &AreMethodsAllowed{logger: logger} + case ErrorRateAcceptable: + panic("ErrorRateAcceptable filter is not implemented!") + case LatencyAcceptable: + panic("LatencyAcceptable filter is not implemented!") default: panic("Unknown filter type " + filterName + "!") } @@ -340,9 +336,11 @@ const ( NearGlobalMaxHeight NodeFilterType = "nearGlobalMaxHeight" MaxHeightForGroup NodeFilterType = "maxHeightForGroup" MethodsAllowed NodeFilterType = "methodsAllowed" + ErrorRateAcceptable NodeFilterType = "errorRateAcceptable" + LatencyAcceptable NodeFilterType = "latencyAcceptable" ) -func getFilterTypeName(v interface{}) NodeFilterType { +func GetFilterTypeName(v interface{}) NodeFilterType { t := reflect.TypeOf(v) // If it's a pointer, get the element type. diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 38bb9839..151b0b25 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -316,19 +316,19 @@ func emitError(store *metadata.ChainMetadataStore, groupID, upstreamID string, e store.ProcessErrorUpdate(groupID, upstreamID, err) } -func Test_getFilterTypeName(t *testing.T) { +func TestGetFilterTypeName(t *testing.T) { Assert := assert.New(t) Assert.Equal( NodeFilterType("AlwaysPass"), - getFilterTypeName(AlwaysPass{}), + GetFilterTypeName(AlwaysPass{}), ) Assert.Equal( NodeFilterType("AlwaysFail"), - getFilterTypeName(AlwaysFail{}), + GetFilterTypeName(AlwaysFail{}), ) Assert.Equal( NodeFilterType("AndFilter"), - getFilterTypeName(&AndFilter{}), + GetFilterTypeName(&AndFilter{}), ) } diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index df43d416..1a8a4439 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -1,11 +1,9 @@ package route import ( - "slices" "sort" "sync/atomic" - conf "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "go.uber.org/zap" @@ -20,18 +18,15 @@ type RoutingStrategy interface { requestMetadata metadata.RequestMetadata, ) (string, error) } - type PriorityRoundRobinStrategy struct { - logger *zap.Logger - counter uint64 - alwaysRoute bool + logger *zap.Logger + counter uint64 } -func NewPriorityRoundRobinStrategy(logger *zap.Logger, alwaysRoute bool) *PriorityRoundRobinStrategy { +func NewPriorityRoundRobinStrategy(logger *zap.Logger) *PriorityRoundRobinStrategy { return &PriorityRoundRobinStrategy{ - logger: logger, - counter: 0, - alwaysRoute: alwaysRoute, + logger: logger, + counter: 0, } } @@ -49,23 +44,11 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( upstreamsByPriority types.PriorityToUpstreamsMap, _ metadata.RequestMetadata, ) (string, error) { - statusToUpstreamsByPriority := partitionUpstreams(upstreamsByPriority) - - var healthyUpstreamsByPriority types.PriorityToUpstreamsMap - - var exists bool - - if healthyUpstreamsByPriority, exists = statusToUpstreamsByPriority[conf.ReasonUnknownOrHealthy]; !exists { - // There are no healthy upstreams. - healthyUpstreamsByPriority = make(types.PriorityToUpstreamsMap) - } - - prioritySorted := maps.Keys(healthyUpstreamsByPriority) + prioritySorted := maps.Keys(upstreamsByPriority) sort.Ints(prioritySorted) - // Note that `prioritySorted` can be empty, in which case the body of this loop will not be executed even once. for _, priority := range prioritySorted { - upstreams := healthyUpstreamsByPriority[priority] + upstreams := upstreamsByPriority[priority] if len(upstreams) > 0 { atomic.AddUint64(&s.counter, 1) @@ -76,107 +59,5 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( s.logger.Debug("Did not find any healthy nodes in priority.", zap.Int("priority", priority)) } - // No healthy upstreams are available. If `alwaysRoute` is true, find an unhealthy upstream to route to anyway. - // TODO(polsar): At this time, the only unhealthy upstreams that can end up here are those due to high latency - // or error rate. Pass ALL configured upstreams in `upstreamsByPriority`. Their health status should be indicated - // in the UpstreamConfig.HealthStatus field. - if s.alwaysRoute { - s.logger.Warn("No healthy upstreams found but `alwaysRoute` is set to true.") - - // If available, return an upstream that's unhealthy due to high latency rate. - if upstreamsByPriorityLatencyUnhealthy, ok := statusToUpstreamsByPriority[conf.ReasonLatencyTooHighRate]; ok { - upstream := getHighestPriorityUpstream(upstreamsByPriorityLatencyUnhealthy) - if upstream == nil { - // This indicates a non-recoverable bug in the code. - panic("Upstream not found!") - } - - s.logger.Info( - "Routing to an upstream with high latency.", - zap.String("ID", upstream.ID), - zap.String("GroupID", upstream.GroupID), - zap.String("HTTPURL", upstream.HTTPURL), - zap.String("WSURL", upstream.WSURL), - ) - - return upstream.ID, nil - } - - // If available, return an upstream that's unhealthy due to high error rate. - if upstreamsByPriorityErrorUnhealthy, ok := statusToUpstreamsByPriority[conf.ReasonErrorRate]; ok { - upstream := getHighestPriorityUpstream(upstreamsByPriorityErrorUnhealthy) - if upstream == nil { - // This indicates a non-recoverable bug in the code. - panic("Upstream not found!") - } - - s.logger.Info( - "Routing to an upstream with high error rate.", - zap.String("ID", upstream.ID), - zap.String("GroupID", upstream.GroupID), - zap.String("HTTPURL", upstream.HTTPURL), - zap.String("WSURL", upstream.WSURL), - ) - - return upstream.ID, nil - } - - // TODO(polsar): If we get here, that means all upstreams are unhealthy, but they are all unhealthy - // due to a reason other than high latency or error rate. We should still be able to route to one of those. - // Asana task: https://app.asana.com/0/1207397277805097/1208186611173034/f - s.logger.Error("All upstreams are unhealthy due to reasons other than high latency or error rate.") - } - - // TODO(polsar): (Once the task above is complete.) If `alwaysRoute` is true, the only way we can get here is if - // there are no upstreams in `upstreamsByPriority`. This shouldn't be possible, so we should log a critical error. return "", DefaultNoHealthyUpstreamsError } - -// Partitions the given upstreams by their health status. -func partitionUpstreams(upstreamsByPriority types.PriorityToUpstreamsMap) map[conf.UnhealthyReason]types.PriorityToUpstreamsMap { - statusToUpstreamsByPriority := make(map[conf.UnhealthyReason]types.PriorityToUpstreamsMap) - - for priority, upstreams := range upstreamsByPriority { - for _, upstream := range upstreams { - status := upstream.HealthStatus - - if upstreamsByPriorityForStatus, statusExists := statusToUpstreamsByPriority[status]; statusExists { - // The priority-to-upstreams map exists for the status. - if upstreamsForStatusAndPriority, priorityExists := upstreamsByPriorityForStatus[priority]; priorityExists { - // The upstreams slice exists for the status and priority, so append to it. - upstreamsByPriorityForStatus[priority] = append(upstreamsForStatusAndPriority, upstream) - } else { - // The upstreams slice does not exist for the status and priority, so create it. - upstreamsByPriorityForStatus[priority] = []*conf.UpstreamConfig{upstream} - } - } else { - // The priority-to-upstreams map does not exist for the status, so create it. - statusToUpstreamsByPriority[status] = types.PriorityToUpstreamsMap{ - priority: []*conf.UpstreamConfig{upstream}, - } - } - } - } - - return statusToUpstreamsByPriority -} - -// Returns the first upstream with the highest priority in the given map. Note the in our case the highest priority -// corresponds to the lowest int value. -func getHighestPriorityUpstream(upstreamsByPriority types.PriorityToUpstreamsMap) *conf.UpstreamConfig { - priorities := maps.Keys(upstreamsByPriority) - - if len(priorities) == 0 { - return nil - } - - maxPriority := slices.Min(priorities) - upstreams := upstreamsByPriority[maxPriority] - - if len(upstreams) == 0 { - // If a priority is a key in the passed map, there must be at least one upstream for it. - panic("No upstreams found!") - } - - return upstreams[0] -} diff --git a/internal/route/routing_strategy_test.go b/internal/route/routing_strategy_test.go index a3db870b..96b7025d 100644 --- a/internal/route/routing_strategy_test.go +++ b/internal/route/routing_strategy_test.go @@ -17,7 +17,7 @@ func TestPriorityStrategy_HighPriority(t *testing.T) { 1: {cfg("erigon")}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) @@ -40,7 +40,7 @@ func TestPriorityStrategy_LowerPriority(t *testing.T) { 1: {cfg("fallback1"), cfg("fallback2")}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) @@ -57,7 +57,7 @@ func TestPriorityStrategy_NoUpstreams(t *testing.T) { 1: {}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { upstreamID, err := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index 9839362f..8b40b14f 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -28,7 +28,7 @@ type singleChainObjectGraph struct { } func wireSingleChainDependencies( - globalConfig config.GlobalConfig, //nolint:gocritic // Legacy + globalConfig *config.GlobalConfig, chainConfig *config.SingleChainConfig, logger *zap.Logger, rpcCache *cache.RPCCache, @@ -47,11 +47,6 @@ func wireSingleChainDependencies( logger, ) - alwaysRoute := false - if chainConfig.Routing.AlwaysRoute != nil { - alwaysRoute = *chainConfig.Routing.AlwaysRoute - } - enabledNodeFilters := []route.NodeFilterType{ route.Healthy, route.MaxHeightForGroup, @@ -65,10 +60,41 @@ func wireSingleChainDependencies( logger, &chainConfig.Routing, ) - routingStrategy := route.FilteringRoutingStrategy{ - NodeFilter: nodeFilter, - BackingStrategy: route.NewPriorityRoundRobinStrategy(logger, alwaysRoute), - Logger: logger, + + // Determine if we should always route even if no healthy upstreams are available. + alwaysRoute := false + if chainConfig.Routing.AlwaysRoute != nil { + alwaysRoute = *chainConfig.Routing.AlwaysRoute + } + + // If we should always route, use AlwaysRouteFilteringStrategy. Otherwise, use FilteringRoutingStrategy. + backingStrategy := route.NewPriorityRoundRobinStrategy(logger) + + var routingStrategy route.RoutingStrategy + + errorFilter := route.IsErrorRateAcceptable{HealthCheckManager: healthCheckManager} + latencyFilter := route.IsLatencyAcceptable{HealthCheckManager: healthCheckManager} + + if alwaysRoute { + routingStrategy = &route.AlwaysRouteFilteringStrategy{ + NodeFilters: []route.NodeFilter{ + nodeFilter, + &errorFilter, + &latencyFilter, + }, + RemovableFilters: []route.NodeFilterType{ + route.GetFilterTypeName(errorFilter), + route.GetFilterTypeName(latencyFilter), + }, + BackingStrategy: backingStrategy, + Logger: logger, + } + } else { + routingStrategy = &route.FilteringRoutingStrategy{ + NodeFilter: nodeFilter, + BackingStrategy: backingStrategy, + Logger: logger, + } } router := route.NewRouter( @@ -78,7 +104,7 @@ func wireSingleChainDependencies( chainConfig.Groups, chainMetadataStore, healthCheckManager, - &routingStrategy, + routingStrategy, metricContainer, logger, rpcCache, @@ -114,7 +140,7 @@ func WireDependenciesForAllChains( childLogger := rootLogger.With(zap.String("chainName", currentChainConfig.ChainName)) dependencyContainer := wireSingleChainDependencies( - gatewayConfig.Global, + &gatewayConfig.Global, currentChainConfig, childLogger, rpcCache, diff --git a/internal/server/web_server_e2e_test.go b/internal/server/web_server_e2e_test.go index 1c28ce4e..5169b104 100644 --- a/internal/server/web_server_e2e_test.go +++ b/internal/server/web_server_e2e_test.go @@ -332,10 +332,15 @@ func executeRequest( handler.ServeHTTP(recorder, req) - result := recorder.Result() + result := recorder.Result() //nolint:bodyclose // Body is closed in the defer statement below. resultBody, _ := io.ReadAll(result.Body) - defer result.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { //nolint:errorlint // Wrong error. + t.Errorf("Error closing response body: %s", err) + } + }(result.Body) responseBody, err := jsonrpc.DecodeResponseBody(resultBody) assert.NoError(t, err) @@ -414,7 +419,7 @@ func handleSingleRequest(t *testing.T, request jsonrpc.SingleRequestBody, case "net_peerCount": return jsonrpc.SingleResponseBody{Result: getResultFromString(hexutil.Uint64(10).String())} - case config.PassiveLatencyCheckMethod: + case "eth_chainId": return jsonrpc.SingleResponseBody{Result: getResultFromString(hexutil.Uint64(11).String())} case "eth_getBlockByNumber": @@ -424,7 +429,7 @@ func handleSingleRequest(t *testing.T, request jsonrpc.SingleRequestBody, }) return jsonrpc.SingleResponseBody{ - Result: json.RawMessage(result), + Result: result, } case "eth_blockNumber": @@ -454,7 +459,7 @@ func setUpUnhealthyUpstream(t *testing.T) *httptest.Server { switch r := requestBody.(type) { case *jsonrpc.SingleRequestBody: switch requestBody.GetMethod() { - case "eth_syncing", "net_peerCount", config.PassiveLatencyCheckMethod, "eth_getBlockByNumber": + case "eth_syncing", "net_peerCount", "eth_chainId", "eth_getBlockByNumber": responseBody = &jsonrpc.SingleResponseBody{Error: &jsonrpc.Error{Message: "This is a failing fake node!"}} writeResponseBody(t, writer, responseBody) default: diff --git a/internal/types/types.go b/internal/types/types.go index 466b8a32..a08c2a9d 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -11,6 +11,7 @@ type UpstreamStatus struct { BlockHeightCheck BlockHeightChecker PeerCheck Checker SyncingCheck Checker + ErrorCheck ErrorLatencyChecker LatencyCheck ErrorLatencyChecker ID string GroupID string @@ -40,8 +41,7 @@ type Checker interface { //go:generate mockery --output ../mocks --name ErrorLatencyChecker --with-expecter type ErrorLatencyChecker interface { - RunPassiveCheck() - GetUnhealthyReason(methods []string) config.UnhealthyReason + IsPassing(methods []string) bool RecordRequest(data *RequestData) } From e85b2ee32f8966ab4587881655601afc2b221288 Mon Sep 17 00:00:00 2001 From: polsar88 Date: Mon, 7 Oct 2024 14:09:38 -0700 Subject: [PATCH 3/3] feat: split error latency checker into separate structs (#142) * fix: remove a TODO * fix: field alignment * fix: field alignment * fix: readability * fix: update `README.md` * feat: initial implementation of `AlwaysRouteRoutingStrategy` * fix: comment formatting * fix: comment phrasing * fix: add a TODO * fix: renaming * fix: revert to the previous version of `PriorityRoundRobinStrategy` * fix: remove `HealthStatus` field * feat: LatencyChecker can now be configured to only check latency, errors, or both * feat: create separate instances of LatencyChecker for error and latency checking * feat: plumbing for `AlwaysRouteFilteringStrategy` * fix: tests * fix: removable filters * feat: add debug logging * fix: move `getFilterTypeName` function to `node_filter` module * refactor: deduplicate filter upstreams code * refactor: remove passive error/latency checker code * refactor: split error latency checker into two structs * fix: error and latency filters are now passed to regular filtering routing strategy * fix: typo * refactor: add check enabled flag to error and latency structs * refactor: rename struct field --- internal/checks/common.go | 50 ++ internal/checks/error.go | 221 +++++++++ .../{errorlatency_test.go => error_test.go} | 0 internal/checks/errorlatency.go | 453 ------------------ internal/checks/latency.go | 176 +++++++ internal/checks/latency_test.go | 1 + internal/checks/manager.go | 4 - internal/config/config.go | 2 + internal/config/config_test.go | 15 + internal/route/node_filter.go | 16 +- internal/server/object_graph.go | 15 +- 11 files changed, 484 insertions(+), 469 deletions(-) create mode 100644 internal/checks/common.go create mode 100644 internal/checks/error.go rename internal/checks/{errorlatency_test.go => error_test.go} (100%) delete mode 100644 internal/checks/errorlatency.go create mode 100644 internal/checks/latency.go create mode 100644 internal/checks/latency_test.go diff --git a/internal/checks/common.go b/internal/checks/common.go new file mode 100644 index 00000000..5563d2c9 --- /dev/null +++ b/internal/checks/common.go @@ -0,0 +1,50 @@ +package checks + +import ( + "math" + "time" + + "github.com/failsafe-go/failsafe-go/circuitbreaker" + "github.com/satsuma-data/node-gateway/internal/config" +) + +const ( + PercentPerFrac = 100 + MinNumRequestsForRate = 1 // The minimum number of requests required to compute the error rate. +) + +// NewCircuitBreaker abstracts away the rather complex circuitbreaker.Builder API. +// https://pkg.go.dev/github.com/failsafe-go/failsafe-go/circuitbreaker +// https://failsafe-go.dev/circuit-breaker/ +func NewCircuitBreaker( + errorRate float64, + detectionWindow time.Duration, + banWindow time.Duration, +) circuitbreaker.CircuitBreaker[any] { + // TODO(polsar): Check that `0.0 < errorRate <= 1.0` holds. + return circuitbreaker.Builder[any](). + HandleResult(false). // The false return value of the wrapped call will be interpreted as a failure. + WithFailureRateThreshold( + uint(math.Floor(errorRate*PercentPerFrac)), // Minimum percentage of failed requests to open the breaker. + MinNumRequestsForRate, + detectionWindow, + ). + WithDelay(banWindow). + Build() +} + +func getDetectionWindow(routingConfig *config.RoutingConfig) time.Duration { + if routingConfig != nil && routingConfig.DetectionWindow != nil { + return *routingConfig.DetectionWindow + } + + return config.DefaultDetectionWindow +} + +func getBanWindow(routingConfig *config.RoutingConfig) time.Duration { + if routingConfig != nil && routingConfig.BanWindow != nil { + return *routingConfig.BanWindow + } + + return config.DefaultBanWindow +} diff --git a/internal/checks/error.go b/internal/checks/error.go new file mode 100644 index 00000000..72389b4e --- /dev/null +++ b/internal/checks/error.go @@ -0,0 +1,221 @@ +package checks + +import ( + "net/http" + "strconv" + "strings" + + "go.uber.org/zap" + + "github.com/failsafe-go/failsafe-go/circuitbreaker" + "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metrics" + "github.com/satsuma-data/node-gateway/internal/types" +) + +const ResponseCodeWildcard = 'x' + +type ErrorCheck struct { + Err error + metricsContainer *metrics.Container + logger *zap.Logger + upstreamConfig *config.UpstreamConfig + routingConfig *config.RoutingConfig + errorCircuitBreaker ErrorCircuitBreaker + isCheckEnabled bool +} + +type ErrorCircuitBreaker interface { + RecordResponse(isError bool) + IsOpen() bool +} + +type ErrorStats struct { + circuitBreaker circuitbreaker.CircuitBreaker[any] +} + +func NewErrorStats(routingConfig *config.RoutingConfig) ErrorCircuitBreaker { + return &ErrorStats{ + circuitBreaker: NewCircuitBreaker( + getErrorsRate(routingConfig), + getDetectionWindow(routingConfig), + getBanWindow(routingConfig), + ), + } +} + +func NewErrorChecker( + upstreamConfig *config.UpstreamConfig, + routingConfig *config.RoutingConfig, + metricsContainer *metrics.Container, + logger *zap.Logger, +) types.ErrorLatencyChecker { + return &ErrorCheck{ + upstreamConfig: upstreamConfig, + routingConfig: routingConfig, + metricsContainer: metricsContainer, + logger: logger, + errorCircuitBreaker: NewErrorStats(routingConfig), + isCheckEnabled: routingConfig.IsEnabled, + } +} + +func (c *ErrorCheck) isError(httpCode, jsonRPCCode, errorMsg string) bool { + if isMatchForPatterns(httpCode, c.routingConfig.Errors.HTTPCodes) || + isMatchForPatterns(jsonRPCCode, c.routingConfig.Errors.JSONRPCCodes) || + isErrorMatches(errorMsg, c.routingConfig.Errors.ErrorStrings) { + return true + } + + return false +} + +func isMatchForPatterns(responseCode string, patterns []string) bool { + if responseCode == "" { + return false + } + + if len(patterns) == 0 { + return true + } + + for _, pattern := range patterns { + if isMatch(responseCode, pattern) { + return true + } + } + + return false +} + +// Returns true iff the response code matches the pattern using ResponseCodeWildcard as the wildcard character. +func isMatch(responseCode, pattern string) bool { + if len(responseCode) != len(pattern) { + return false + } + + for i, x := range responseCode { + y := string(pattern[i]) + + if strings.EqualFold(y, string(ResponseCodeWildcard)) { + continue + } + + if string(x) != y { + return false + } + } + + return true +} + +func isErrorMatches(errorMsg string, errors []string) bool { + if errorMsg == "" { + return false + } + + if len(errors) == 0 { + return true + } + + for _, errorSubstr := range errors { + // TODO(polsar): Add support for regular expression matching. + if strings.Contains(errorMsg, errorSubstr) { + return true + } + } + + return false +} + +func getErrorsRate(routingConfig *config.RoutingConfig) float64 { + if routingConfig != nil && routingConfig.Errors != nil { + return routingConfig.Errors.Rate + } + + return config.DefaultErrorRate +} + +func (e *ErrorStats) RecordResponse(isError bool) { + if isError { + e.circuitBreaker.RecordFailure() + } else { + e.circuitBreaker.RecordSuccess() + } +} + +func (e *ErrorStats) IsOpen() bool { + // TODO(polsar): We should be able to check `e.circuitBreaker.IsOpen()`, + // but it appears to remain open forever, regardless of the configured delay. + // We also must reset the circuit breaker manually if it is not supposed to be open. + isOpen := e.circuitBreaker.RemainingDelay() > 0 + if !isOpen { + e.circuitBreaker.Close() + } + + return isOpen +} + +func (c *ErrorCheck) IsPassing([]string) bool { + if !c.isCheckEnabled { + return true + } + + if c.errorCircuitBreaker != nil && c.errorCircuitBreaker.IsOpen() { + c.logger.Debug( + "ErrorCheck is not passing due to too many errors.", + zap.String("upstreamID", c.upstreamConfig.ID), + zap.Error(c.Err), + ) + + return false + } + + return true +} + +func (c *ErrorCheck) RecordRequest(data *types.RequestData) { + if !c.isCheckEnabled { + return + } + + errorString := "" + if data.Error != nil { + errorString = data.Error.Error() + } + + if data.HTTPResponseCode >= http.StatusBadRequest || data.ResponseBody == nil { + // No RPC responses are available since the HTTP request errored out or does not contain a JSON RPC response. + // TODO(polsar): We might want to emit a Prometheus stat like we do for an RPC error below. + c.errorCircuitBreaker.RecordResponse(c.isError( + strconv.Itoa(data.HTTPResponseCode), // Note that this CAN be 200 OK. + "", + errorString, + )) + } else { // data.ResponseBody != nil + for _, resp := range data.ResponseBody.GetSubResponses() { + if resp.Error != nil { + // Do not ignore this response even if it does not correspond to an RPC request. + if c.isError("", strconv.Itoa(resp.Error.Code), resp.Error.Message) { + c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + metrics.HTTPRequest, + data.Method, + ).Inc() + + // Even though this is a single HTTP request, we count each RPC JSON subresponse error. + c.errorCircuitBreaker.RecordResponse(true) // JSON RPC subrequest error + } else { + c.errorCircuitBreaker.RecordResponse(false) // JSON RPC subrequest OK + } + } + } + } + + c.metricsContainer.ErrorLatency.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + data.Method, + ).Set(float64(data.Latency.Milliseconds())) +} diff --git a/internal/checks/errorlatency_test.go b/internal/checks/error_test.go similarity index 100% rename from internal/checks/errorlatency_test.go rename to internal/checks/error_test.go diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go deleted file mode 100644 index ab50c959..00000000 --- a/internal/checks/errorlatency.go +++ /dev/null @@ -1,453 +0,0 @@ -package checks - -import ( - "math" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/failsafe-go/failsafe-go/circuitbreaker" - - "github.com/satsuma-data/node-gateway/internal/client" - conf "github.com/satsuma-data/node-gateway/internal/config" - "github.com/satsuma-data/node-gateway/internal/metrics" - "github.com/satsuma-data/node-gateway/internal/types" - "go.uber.org/zap" -) - -const ( - ResponseCodeWildcard = 'x' - PercentPerFrac = 100 - MinNumRequestsForRate = 1 // The minimum number of requests required to compute the error rate. -) - -type ErrorCircuitBreaker interface { - RecordResponse(isError bool) - IsOpen() bool -} - -type LatencyCircuitBreaker interface { - RecordLatency(latency time.Duration) - IsOpen() bool - GetThreshold() time.Duration -} - -type ErrorStats struct { - circuitBreaker circuitbreaker.CircuitBreaker[any] -} - -func (e *ErrorStats) RecordResponse(isError bool) { - if isError { - e.circuitBreaker.RecordFailure() - } else { - e.circuitBreaker.RecordSuccess() - } -} - -func (e *ErrorStats) IsOpen() bool { - // TODO(polsar): We should be able to check `e.circuitBreaker.IsOpen()`, - // but it appears to remain open forever, regardless of the configured delay. - // We also must reset the circuit breaker manually if it is not supposed to be open. - isOpen := e.circuitBreaker.RemainingDelay() > 0 - if !isOpen { - e.circuitBreaker.Close() - } - - return isOpen -} - -func NewErrorStats(routingConfig *conf.RoutingConfig) ErrorCircuitBreaker { - return &ErrorStats{ - circuitBreaker: NewCircuitBreaker( - getErrorsRate(routingConfig), - getDetectionWindow(routingConfig), - getBanWindow(routingConfig), - ), - } -} - -type LatencyStats struct { - circuitBreaker circuitbreaker.CircuitBreaker[any] - threshold time.Duration -} - -func (l *LatencyStats) RecordLatency(latency time.Duration) { - if latency >= l.threshold { - l.circuitBreaker.RecordFailure() - } else { - l.circuitBreaker.RecordSuccess() - } -} - -func (l *LatencyStats) IsOpen() bool { - // TODO(polsar): We should be able to check `l.circuitBreaker.IsOpen()`, - // but it appears to remain open forever, regardless of the configured delay. - // We also must reset the circuit breaker manually if it is not supposed to be open. - isOpen := l.circuitBreaker.RemainingDelay() > 0 - if !isOpen { - l.circuitBreaker.Close() - } - - return isOpen -} - -func (l *LatencyStats) GetThreshold() time.Duration { - return l.threshold -} - -func NewLatencyStats(routingConfig *conf.RoutingConfig, method string) LatencyCircuitBreaker { - return &LatencyStats{ - threshold: getLatencyThreshold(routingConfig, method), - circuitBreaker: NewCircuitBreaker( - conf.DefaultLatencyTooHighRate, - getDetectionWindow(routingConfig), - getBanWindow(routingConfig), - ), - } -} - -// NewCircuitBreaker abstracts away the rather complex circuitbreaker.Builder API. -// https://pkg.go.dev/github.com/failsafe-go/failsafe-go/circuitbreaker -// https://failsafe-go.dev/circuit-breaker/ -func NewCircuitBreaker( - errorRate float64, - detectionWindow time.Duration, - banWindow time.Duration, -) circuitbreaker.CircuitBreaker[any] { - // TODO(polsar): Check that `0.0 < errorRate <= 1.0` holds. - return circuitbreaker.Builder[any](). - HandleResult(false). // The false return value of the wrapped call will be interpreted as a failure. - WithFailureRateThreshold( - uint(math.Floor(errorRate*PercentPerFrac)), // Minimum percentage of failed requests to open the breaker. - MinNumRequestsForRate, - detectionWindow, - ). - WithDelay(banWindow). - Build() -} - -// ErrorLatencyCheck -// Error checking is disabled if `errorCircuitBreaker` is nil. -// Latency checking is disabled if `methodLatencyBreaker` is nil. -// At least one of these two must be non-nil. -type ErrorLatencyCheck struct { - client client.EthClient - Err error - clientGetter client.EthClientGetter - metricsContainer *metrics.Container - logger *zap.Logger - upstreamConfig *conf.UpstreamConfig - routingConfig *conf.RoutingConfig - errorCircuitBreaker ErrorCircuitBreaker - methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker - lock sync.RWMutex -} - -func NewErrorChecker( - upstreamConfig *conf.UpstreamConfig, - routingConfig *conf.RoutingConfig, - clientGetter client.EthClientGetter, - metricsContainer *metrics.Container, - logger *zap.Logger, -) types.ErrorLatencyChecker { - return NewErrorLatencyChecker( - upstreamConfig, - routingConfig, - clientGetter, - metricsContainer, - logger, - true, - false, - ) -} - -func NewLatencyChecker( - upstreamConfig *conf.UpstreamConfig, - routingConfig *conf.RoutingConfig, - clientGetter client.EthClientGetter, - metricsContainer *metrics.Container, - logger *zap.Logger, -) types.ErrorLatencyChecker { - return NewErrorLatencyChecker( - upstreamConfig, - routingConfig, - clientGetter, - metricsContainer, - logger, - false, - true, - ) -} - -func NewErrorLatencyChecker( - upstreamConfig *conf.UpstreamConfig, - routingConfig *conf.RoutingConfig, - clientGetter client.EthClientGetter, - metricsContainer *metrics.Container, - logger *zap.Logger, - enableErrorChecking bool, - enableLatencyChecking bool, -) types.ErrorLatencyChecker { - if !enableErrorChecking && !enableLatencyChecking { - panic("ErrorLatencyCheck must have at least one of error or latency checking enabled.") - } - - // Create the error circuit breaker if error checking is enabled. - var errorCircuitBreaker ErrorCircuitBreaker - if enableErrorChecking { - errorCircuitBreaker = NewErrorStats(routingConfig) - } - - // Create the latency circuit breaker if latency checking is enabled. - var latencyCircuitBreaker map[string]LatencyCircuitBreaker - if enableLatencyChecking { - latencyCircuitBreaker = make(map[string]LatencyCircuitBreaker) - } - - return &ErrorLatencyCheck{ - upstreamConfig: upstreamConfig, - routingConfig: routingConfig, - clientGetter: clientGetter, - metricsContainer: metricsContainer, - logger: logger, - errorCircuitBreaker: errorCircuitBreaker, - methodLatencyBreaker: latencyCircuitBreaker, - } -} - -// Returns the LatencyStats instance corresponding to the specified RPC method. -// This method is thread-safe. -func (c *ErrorLatencyCheck) getLatencyCircuitBreaker(method string) LatencyCircuitBreaker { - c.lock.Lock() - defer c.lock.Unlock() - - stats, exists := c.methodLatencyBreaker[method] - - if !exists { - // This is the first time we are checking this method so initialize its LatencyStats instance. - stats = NewLatencyStats(c.routingConfig, method) - c.methodLatencyBreaker[method] = stats - } - - return stats -} - -// IsPassing -// TODO(polsar): Split this method into two separate methods: IsPassingError and IsPassingLatency. -func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { - if !c.routingConfig.IsEnhancedRoutingControlDefined() { - return true - } - - if c.errorCircuitBreaker != nil && c.errorCircuitBreaker.IsOpen() { - c.logger.Debug( - "ErrorLatencyCheck is not passing due to too many errors.", - zap.String("upstreamID", c.upstreamConfig.ID), - zap.Error(c.Err), - ) - - return false - } - - c.lock.Lock() - defer c.lock.Unlock() - - if c.methodLatencyBreaker == nil { - return true - } - - // Only consider the passed methods, even if other methods' circuit breakers might be open. - // - // TODO(polsar): If the circuit breaker for any of the passed methods is open, we consider this upstream - // as unhealthy for all the other passed methods, even if their circuit breakers are closed. This might - // be undesirable, though since all the methods are part of the same request, we would have to somehow - // modify the request to only contain the healthy methods. This seems like more trouble than is worth. - for _, method := range methods { - if breaker, exists := c.methodLatencyBreaker[method]; exists && breaker.IsOpen() { - c.logger.Debug( - "ErrorLatencyCheck is not passing due to high latency of an RPC method.", - zap.String("upstreamID", c.upstreamConfig.ID), - zap.Any("method", method), - zap.Error(c.Err), - ) - - return false - } - } - - return true -} - -// RecordRequest -// TODO(polsar): Split this method into two separate methods: RecordError and RecordLatency. -func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { - if !c.routingConfig.IsEnhancedRoutingControlDefined() { - return - } - - // Record the request latency if latency checking is enabled. - if c.methodLatencyBreaker != nil { - latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) - latencyCircuitBreaker.RecordLatency(data.Latency) - - if data.Latency >= latencyCircuitBreaker.GetThreshold() { - c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - data.Method, - ).Inc() - } - } - - // If error checking is disabled, we can return early. - if c.errorCircuitBreaker == nil { - return - } - - errorString := "" - if data.Error != nil { - errorString = data.Error.Error() - } - - if data.HTTPResponseCode >= http.StatusBadRequest || data.ResponseBody == nil { - // No RPC responses are available since the HTTP request errored out or does not contain a JSON RPC response. - // TODO(polsar): We might want to emit a Prometheus stat like we do for an RPC error below. - c.errorCircuitBreaker.RecordResponse(c.isError( - strconv.Itoa(data.HTTPResponseCode), // Note that this CAN be 200 OK. - "", - errorString, - )) - } else { // data.ResponseBody != nil - for _, resp := range data.ResponseBody.GetSubResponses() { - if resp.Error != nil { - // Do not ignore this response even if it does not correspond to an RPC request. - if c.isError("", strconv.Itoa(resp.Error.Code), resp.Error.Message) { - c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - data.Method, - ).Inc() - - // Even though this is a single HTTP request, we count each RPC JSON subresponse error. - c.errorCircuitBreaker.RecordResponse(true) // JSON RPC subrequest error - } else { - c.errorCircuitBreaker.RecordResponse(false) // JSON RPC subrequest OK - } - } - } - } - - c.metricsContainer.ErrorLatency.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - data.Method, - ).Set(float64(data.Latency.Milliseconds())) -} - -func (c *ErrorLatencyCheck) isError(httpCode, jsonRPCCode, errorMsg string) bool { - if isMatchForPatterns(httpCode, c.routingConfig.Errors.HTTPCodes) || - isMatchForPatterns(jsonRPCCode, c.routingConfig.Errors.JSONRPCCodes) || - isErrorMatches(errorMsg, c.routingConfig.Errors.ErrorStrings) { - return true - } - - return false -} - -func isMatchForPatterns(responseCode string, patterns []string) bool { - if responseCode == "" { - return false - } - - if len(patterns) == 0 { - return true - } - - for _, pattern := range patterns { - if isMatch(responseCode, pattern) { - return true - } - } - - return false -} - -// Returns true iff the response code matches the pattern using ResponseCodeWildcard as the wildcard character. -func isMatch(responseCode, pattern string) bool { - if len(responseCode) != len(pattern) { - return false - } - - for i, x := range responseCode { - y := string(pattern[i]) - - if strings.EqualFold(y, string(ResponseCodeWildcard)) { - continue - } - - if string(x) != y { - return false - } - } - - return true -} - -func isErrorMatches(errorMsg string, errors []string) bool { - if errorMsg == "" { - return false - } - - if len(errors) == 0 { - return true - } - - for _, errorSubstr := range errors { - // TODO(polsar): Add support for regular expression matching. - if strings.Contains(errorMsg, errorSubstr) { - return true - } - } - - return false -} - -func getDetectionWindow(routingConfig *conf.RoutingConfig) time.Duration { - if routingConfig != nil && routingConfig.DetectionWindow != nil { - return *routingConfig.DetectionWindow - } - - return conf.DefaultDetectionWindow -} - -func getBanWindow(routingConfig *conf.RoutingConfig) time.Duration { - if routingConfig != nil && routingConfig.BanWindow != nil { - return *routingConfig.BanWindow - } - - return conf.DefaultBanWindow -} - -func getLatencyThreshold(routingConfig *conf.RoutingConfig, method string) time.Duration { - if routingConfig != nil && routingConfig.Latency != nil { - if latency, exists := routingConfig.Latency.MethodLatencyThresholds[method]; exists { - return latency - } - - return routingConfig.Latency.Threshold - } - - return conf.DefaultMaxLatency -} - -func getErrorsRate(routingConfig *conf.RoutingConfig) float64 { - if routingConfig != nil && routingConfig.Errors != nil { - return routingConfig.Errors.Rate - } - - return conf.DefaultErrorRate -} diff --git a/internal/checks/latency.go b/internal/checks/latency.go new file mode 100644 index 00000000..c5b58b69 --- /dev/null +++ b/internal/checks/latency.go @@ -0,0 +1,176 @@ +package checks + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/failsafe-go/failsafe-go/circuitbreaker" + "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metrics" + "github.com/satsuma-data/node-gateway/internal/types" +) + +type LatencyCheck struct { + Err error + metricsContainer *metrics.Container + logger *zap.Logger + upstreamConfig *config.UpstreamConfig + routingConfig *config.RoutingConfig + methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker + lock sync.RWMutex + isCheckEnabled bool +} + +type LatencyCircuitBreaker interface { + RecordLatency(latency time.Duration) + IsOpen() bool + GetThreshold() time.Duration +} + +type LatencyStats struct { + circuitBreaker circuitbreaker.CircuitBreaker[any] + threshold time.Duration +} + +func (l *LatencyStats) RecordLatency(latency time.Duration) { + if latency >= l.threshold { + l.circuitBreaker.RecordFailure() + } else { + l.circuitBreaker.RecordSuccess() + } +} + +func (l *LatencyStats) GetThreshold() time.Duration { + return l.threshold +} + +func NewLatencyStats(routingConfig *config.RoutingConfig, method string) LatencyCircuitBreaker { + return &LatencyStats{ + threshold: getLatencyThreshold(routingConfig, method), + circuitBreaker: NewCircuitBreaker( + config.DefaultLatencyTooHighRate, + getDetectionWindow(routingConfig), + getBanWindow(routingConfig), + ), + } +} + +func NewLatencyChecker( + upstreamConfig *config.UpstreamConfig, + routingConfig *config.RoutingConfig, + metricsContainer *metrics.Container, + logger *zap.Logger, +) types.ErrorLatencyChecker { + return &LatencyCheck{ + upstreamConfig: upstreamConfig, + routingConfig: routingConfig, + metricsContainer: metricsContainer, + logger: logger, + methodLatencyBreaker: make(map[string]LatencyCircuitBreaker), + isCheckEnabled: routingConfig.IsEnabled, + } +} + +// Returns the LatencyStats instance corresponding to the specified RPC method. +// This method is thread-safe. +func (c *LatencyCheck) getLatencyCircuitBreaker(method string) LatencyCircuitBreaker { + c.lock.Lock() + defer c.lock.Unlock() + + stats, exists := c.methodLatencyBreaker[method] + + if !exists { + // This is the first time we are checking this method so initialize its LatencyStats instance. + stats = NewLatencyStats(c.routingConfig, method) + c.methodLatencyBreaker[method] = stats + } + + return stats +} + +func getLatencyThreshold(routingConfig *config.RoutingConfig, method string) time.Duration { + if routingConfig != nil && routingConfig.Latency != nil { + if latency, exists := routingConfig.Latency.MethodLatencyThresholds[method]; exists { + return latency + } + + return routingConfig.Latency.Threshold + } + + return config.DefaultMaxLatency +} + +func (l *LatencyStats) IsOpen() bool { + // TODO(polsar): We should be able to check `l.circuitBreaker.IsOpen()`, + // but it appears to remain open forever, regardless of the configured delay. + // We also must reset the circuit breaker manually if it is not supposed to be open. + isOpen := l.circuitBreaker.RemainingDelay() > 0 + if !isOpen { + l.circuitBreaker.Close() + } + + return isOpen +} + +func (c *LatencyCheck) IsPassing(methods []string) bool { + if !c.isCheckEnabled { + return true + } + + c.lock.Lock() + defer c.lock.Unlock() + + if c.methodLatencyBreaker == nil { + return true + } + + // Only consider the passed methods, even if other methods' circuit breakers might be open. + // + // TODO(polsar): If the circuit breaker for any of the passed methods is open, we consider this upstream + // as unhealthy for all the other passed methods, even if their circuit breakers are closed. This might + // be undesirable, though since all the methods are part of the same request, we would have to somehow + // modify the request to only contain the healthy methods. This seems like more trouble than is worth. + for _, method := range methods { + if breaker, exists := c.methodLatencyBreaker[method]; exists && breaker.IsOpen() { + c.logger.Debug( + "ErrorLatencyCheck is not passing due to high latency of an RPC method.", + zap.String("upstreamID", c.upstreamConfig.ID), + zap.Any("method", method), + zap.Error(c.Err), + ) + + return false + } + } + + return true +} + +func (c *LatencyCheck) RecordRequest(data *types.RequestData) { + if !c.isCheckEnabled { + return + } + + // Record the request latency if latency checking is enabled. + if c.methodLatencyBreaker != nil { + latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) + latencyCircuitBreaker.RecordLatency(data.Latency) + + if data.Latency >= latencyCircuitBreaker.GetThreshold() { + c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + metrics.HTTPRequest, + data.Method, + ).Inc() + } + } + + c.metricsContainer.ErrorLatency.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + data.Method, + ).Set(float64(data.Latency.Milliseconds())) +} diff --git a/internal/checks/latency_test.go b/internal/checks/latency_test.go new file mode 100644 index 00000000..ca6f3f31 --- /dev/null +++ b/internal/checks/latency_test.go @@ -0,0 +1 @@ +package checks diff --git a/internal/checks/manager.go b/internal/checks/manager.go index 18ce65f9..efd3db8d 100644 --- a/internal/checks/manager.go +++ b/internal/checks/manager.go @@ -59,14 +59,12 @@ type healthCheckManager struct { newErrorCheck func( *conf.UpstreamConfig, *conf.RoutingConfig, - client.EthClientGetter, *metrics.Container, *zap.Logger, ) types.ErrorLatencyChecker newLatencyCheck func( *conf.UpstreamConfig, *conf.RoutingConfig, - client.EthClientGetter, *metrics.Container, *zap.Logger, ) types.ErrorLatencyChecker @@ -215,7 +213,6 @@ func (h *healthCheckManager) initializeChecks() { errorCheck = h.newErrorCheck( &config, &h.routingConfig, - client.NewEthClient, h.metricsContainer, h.logger, ) @@ -231,7 +228,6 @@ func (h *healthCheckManager) initializeChecks() { latencyCheck = h.newLatencyCheck( &config, &h.routingConfig, - client.NewEthClient, h.metricsContainer, h.logger, ) diff --git a/internal/config/config.go b/internal/config/config.go index d57aeb8d..0b84ae90 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -352,6 +352,7 @@ type RoutingConfig struct { BanWindow *time.Duration `yaml:"banWindow"` MaxBlocksBehind int `yaml:"maxBlocksBehind"` IsInitialized bool + IsEnabled bool } // IsEnhancedRoutingControlDefined returns true iff any of the enhanced routing control fields are specified @@ -434,6 +435,7 @@ func (r *RoutingConfig) setDefaults(globalConfig *RoutingConfig, force bool) boo } r.IsInitialized = true + r.IsEnabled = true return true } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 006961d5..8695cc95 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -432,6 +432,7 @@ func TestParseConfig_ValidConfigLatencyRouting_AllFieldsSet(t *testing.T) { Latency: &expectedLatencyConfig, AlwaysRoute: newBool(true), IsInitialized: true, + IsEnabled: true, } expectedRoutingChainConfig := expectedRoutingConfig @@ -521,6 +522,7 @@ func TestParseConfig_ValidConfigLatencyRouting_ErrorsConfigOverridesAndMerges(t Latency: &LatencyConfig{MethodLatencyThresholds: map[string]time.Duration{}}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, } expectedRoutingChainConfig := expectedRoutingConfig @@ -601,6 +603,7 @@ func TestParseConfig_ValidConfigLatencyRouting_DefaultsForDetectionAndBanWindows Latency: &expectedLatencyConfig, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, Chains: getCommonChainsConfig(&RoutingConfig{ @@ -610,6 +613,7 @@ func TestParseConfig_ValidConfigLatencyRouting_DefaultsForDetectionAndBanWindows Latency: &expectedLatencyConfig, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), } @@ -761,6 +765,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencySp Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, @@ -786,6 +791,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencySp Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), } @@ -849,6 +855,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencyNo Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, @@ -859,6 +866,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencyNo Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), } @@ -927,6 +935,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencySp Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, @@ -951,6 +960,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencySp Errors: &ErrorsConfig{Rate: DefaultErrorRate}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), } @@ -1057,6 +1067,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencyNo Errors: &expectedErrorsConfig, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, @@ -1090,6 +1101,7 @@ func TestParseConfig_ValidConfigLatencyRouting_MethodLatencies_TopLevelLatencyNo Errors: &expectedErrorsConfig, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), } @@ -1161,6 +1173,7 @@ func TestParseConfig_ValidConfigLatencyRouting_NoGlobalRoutingConfig_TwoChains_O Errors: &ErrorsConfig{Rate: 0.25}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }, }, Chains: append(getCommonChainsConfig(&RoutingConfig{ @@ -1179,6 +1192,7 @@ func TestParseConfig_ValidConfigLatencyRouting_NoGlobalRoutingConfig_TwoChains_O Errors: &ErrorsConfig{Rate: 0.25}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), append(getCommonChainsConfig(&RoutingConfig{ DetectionWindow: NewDuration(DefaultDetectionWindow), BanWindow: NewDuration(DefaultBanWindow), @@ -1196,6 +1210,7 @@ func TestParseConfig_ValidConfigLatencyRouting_NoGlobalRoutingConfig_TwoChains_O Errors: &ErrorsConfig{Rate: 0.25}, AlwaysRoute: newBool(false), IsInitialized: true, + IsEnabled: true, }), getCommonChainsConfig(&RoutingConfig{})...)...), } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 5c7ac9e3..2216611d 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -26,6 +26,14 @@ type AndFilter struct { isTopLevel bool } +func NewAndFilter(filters []NodeFilter, logger *zap.Logger) *AndFilter { + return &AndFilter{ + logger: logger, + filters: filters, + isTopLevel: true, + } +} + func (a *AndFilter) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, numUpstreamsInPriorityGroup int) bool { var result = true @@ -120,9 +128,7 @@ type IsErrorRateAcceptable struct { func (f *IsErrorRateAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) - errorCheck, _ := upstreamStatus.ErrorCheck.(*checks.ErrorLatencyCheck) - - return errorCheck.IsPassing(requestMetadata.Methods) + return upstreamStatus.ErrorCheck.IsPassing(requestMetadata.Methods) } type IsLatencyAcceptable struct { @@ -131,9 +137,7 @@ type IsLatencyAcceptable struct { func (f *IsLatencyAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) - latencyCheck, _ := upstreamStatus.LatencyCheck.(*checks.ErrorLatencyCheck) - - return latencyCheck.IsPassing(requestMetadata.Methods) + return upstreamStatus.LatencyCheck.IsPassing(requestMetadata.Methods) } type IsCloseToGlobalMaxHeight struct { diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index 8b40b14f..035977ce 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -75,13 +75,16 @@ func wireSingleChainDependencies( errorFilter := route.IsErrorRateAcceptable{HealthCheckManager: healthCheckManager} latencyFilter := route.IsLatencyAcceptable{HealthCheckManager: healthCheckManager} + // These should be ordered from most important to least important. + nodeFilters := []route.NodeFilter{ + nodeFilter, + &errorFilter, + &latencyFilter, + } + if alwaysRoute { routingStrategy = &route.AlwaysRouteFilteringStrategy{ - NodeFilters: []route.NodeFilter{ - nodeFilter, - &errorFilter, - &latencyFilter, - }, + NodeFilters: nodeFilters, RemovableFilters: []route.NodeFilterType{ route.GetFilterTypeName(errorFilter), route.GetFilterTypeName(latencyFilter), @@ -91,7 +94,7 @@ func wireSingleChainDependencies( } } else { routingStrategy = &route.FilteringRoutingStrategy{ - NodeFilter: nodeFilter, + NodeFilter: route.NewAndFilter(nodeFilters, logger), BackingStrategy: backingStrategy, Logger: logger, }