From 366febb1bffe91f39f332e8beb356207fac67067 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 15:30:31 -0700 Subject: [PATCH 01/21] fix: remove a TODO --- internal/server/object_graph.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index 68a5622..9839362 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -52,8 +52,6 @@ func wireSingleChainDependencies( alwaysRoute = *chainConfig.Routing.AlwaysRoute } - // TODO(polsar): Here, the HealthCheckManager is wired into the primary FilteringRoutingStrategy. - // We may need to wire it into the secondary PriorityRoundRobinStrategy as well. enabledNodeFilters := []route.NodeFilterType{ route.Healthy, route.MaxHeightForGroup, From bb4ab4b8f7349cf31b7ab7bf29b735970c779181 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 15:49:58 -0700 Subject: [PATCH 02/21] fix: field alignment --- internal/route/node_filter_test.go | 157 +++++++++++++++++++++++------ 1 file changed, 127 insertions(+), 30 deletions(-) diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 462211c..9fa86e0 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -33,24 +33,49 @@ func TestAndFilter_Apply(t *testing.T) { filters []NodeFilter } - type argsType struct { //nolint:govet // field alignment doesn't matter in tests - requestMetadata metadata.RequestMetadata + type argsType struct { upstreamConfig *config.UpstreamConfig + requestMetadata metadata.RequestMetadata } args := argsType{upstreamConfig: cfg("test-node")} - tests := []struct { //nolint:govet // field alignment doesn't matter in tests + tests := []struct { + args argsType name string fields fields - args argsType want bool }{ - {"No filters", fields{}, args, true}, - {"One passing filter", fields{[]NodeFilter{AlwaysPass{}}}, args, true}, - {"Multiple passing filters", fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysPass{}}}, args, true}, - {"One failing filter", fields{[]NodeFilter{AlwaysFail{}}}, args, false}, - {"Multiple passing and one failing", fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}}, args, false}, + { + args, + "No filters", + fields{}, + true, + }, + { + args, + "One passing filter", + fields{[]NodeFilter{AlwaysPass{}}}, + true, + }, + { + args, + "Multiple passing filters", + fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysPass{}}}, + true, + }, + { + args, + "One failing filter", + fields{[]NodeFilter{AlwaysFail{}}}, + false, + }, + { + args, + "Multiple passing and one failing", + fields{[]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}}, + false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -58,7 +83,7 @@ func TestAndFilter_Apply(t *testing.T) { filters: tt.fields.filters, } ok := a.Apply(tt.args.requestMetadata, tt.args.upstreamConfig, 1) - assert.Equalf(t, tt.want, ok, "Apply(%v, %v)", tt.args.requestMetadata, tt.args.upstreamConfig, 1) + assert.Equalf(t, tt.want, ok, "Apply(%v, %v)", tt.args.requestMetadata, tt.args.upstreamConfig) }) } } @@ -174,32 +199,104 @@ func TestMethodsAllowedFilter_Apply(t *testing.T) { // Batch requests stateMethodsMetadata := metadata.RequestMetadata{Methods: []string{"eth_getBalance", "eth_getBlockNumber"}} - type args struct { //nolint:govet // field alignment doesn't matter in tests - requestMetadata metadata.RequestMetadata + type args struct { upstreamConfig *config.UpstreamConfig + requestMetadata metadata.RequestMetadata } - tests := []struct { //nolint:govet // field alignment doesn't matter in tests - name string + tests := []struct { args args + name string want bool }{ - {"stateMethodFullNode", args{stateMethodMetadata, &fullNodeConfig}, false}, - {"stateMethodFullNodeWithArchiveMethodEnabled", - args{stateMethodMetadata, &fullNodeConfigWithArchiveMethodEnabled}, true}, - {"stateMethodArchiveNode", args{stateMethodMetadata, &archiveNodeConfig}, true}, - {"stateMethodArchiveNodeWithMethodDisabled", - args{stateMethodMetadata, &archiveNodeConfigWithMethodDisabled}, false}, - {"nonStateMethodFullNode", args{nonStateMethodMetadata, &fullNodeConfig}, true}, - {"nonStateMethodFullNodeWithMethodDisabled", - args{nonStateMethodMetadata, &fullNodeConfigWithFullMethodDisabled}, false}, - {"nonStateMethodArchiveNode", args{nonStateMethodMetadata, &archiveNodeConfig}, true}, - {"batchRequestsStateMethodsFullNode", args{stateMethodsMetadata, &fullNodeConfig}, false}, - {"batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", - args{stateMethodsMetadata, &fullNodeConfigWithArchiveMethodEnabled}, true}, - {"batchRequestsStateMethodsArchiveNode", args{stateMethodsMetadata, &archiveNodeConfig}, true}, - {"batchRequestsStateMethodsArchiveNodeWithMethodDisabled", - args{stateMethodsMetadata, &archiveNodeConfigWithMethodDisabled}, false}, + { + args{ + &fullNodeConfig, + stateMethodMetadata, + }, + "stateMethodFullNode", + false, + }, + { + args{ + &fullNodeConfigWithArchiveMethodEnabled, + stateMethodMetadata, + }, + "stateMethodFullNodeWithArchiveMethodEnabled", + true, + }, + { + args{ + &archiveNodeConfig, + stateMethodMetadata, + }, + "stateMethodArchiveNode", + true, + }, + { + args{ + &archiveNodeConfigWithMethodDisabled, + stateMethodMetadata, + }, + "stateMethodArchiveNodeWithMethodDisabled", + false, + }, + { + args{ + &fullNodeConfig, + nonStateMethodMetadata, + }, + "nonStateMethodFullNode", + true, + }, + { + args{ + &fullNodeConfigWithFullMethodDisabled, + nonStateMethodMetadata, + }, + "nonStateMethodFullNodeWithMethodDisabled", + false, + }, + { + args{ + &archiveNodeConfig, + nonStateMethodMetadata, + }, + "nonStateMethodArchiveNode", + true, + }, + { + args{ + &fullNodeConfig, + stateMethodsMetadata, + }, + "batchRequestsStateMethodsFullNode", + false, + }, + { + args{ + &fullNodeConfigWithArchiveMethodEnabled, + stateMethodsMetadata, + }, + "batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", + true, + }, + { + args{ + &archiveNodeConfig, + stateMethodsMetadata, + }, + "batchRequestsStateMethodsArchiveNode", + true, + }, + { + args{ + &archiveNodeConfigWithMethodDisabled, + stateMethodsMetadata, + }, + "batchRequestsStateMethodsArchiveNodeWithMethodDisabled", + false, + }, } for _, tt := range tests { From 36891fb0f13f0505b6fcc7a3d523a0f438073b8e Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 15:57:26 -0700 Subject: [PATCH 03/21] fix: field alignment --- internal/route/node_filter_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 9fa86e0..1743113 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -205,96 +205,96 @@ func TestMethodsAllowedFilter_Apply(t *testing.T) { } tests := []struct { - args args name string + args args want bool }{ { + "stateMethodFullNode", args{ &fullNodeConfig, stateMethodMetadata, }, - "stateMethodFullNode", false, }, { + "stateMethodFullNodeWithArchiveMethodEnabled", args{ &fullNodeConfigWithArchiveMethodEnabled, stateMethodMetadata, }, - "stateMethodFullNodeWithArchiveMethodEnabled", true, }, { + "stateMethodArchiveNode", args{ &archiveNodeConfig, stateMethodMetadata, }, - "stateMethodArchiveNode", true, }, { + "stateMethodArchiveNodeWithMethodDisabled", args{ &archiveNodeConfigWithMethodDisabled, stateMethodMetadata, }, - "stateMethodArchiveNodeWithMethodDisabled", false, }, { + "nonStateMethodFullNode", args{ &fullNodeConfig, nonStateMethodMetadata, }, - "nonStateMethodFullNode", true, }, { + "nonStateMethodFullNodeWithMethodDisabled", args{ &fullNodeConfigWithFullMethodDisabled, nonStateMethodMetadata, }, - "nonStateMethodFullNodeWithMethodDisabled", false, }, { + "nonStateMethodArchiveNode", args{ &archiveNodeConfig, nonStateMethodMetadata, }, - "nonStateMethodArchiveNode", true, }, { + "batchRequestsStateMethodsFullNode", args{ &fullNodeConfig, stateMethodsMetadata, }, - "batchRequestsStateMethodsFullNode", false, }, { + "batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", args{ &fullNodeConfigWithArchiveMethodEnabled, stateMethodsMetadata, }, - "batchRequestsStateMethodsFullNodeWithArchiveMethodEnabled", true, }, { + "batchRequestsStateMethodsArchiveNode", args{ &archiveNodeConfig, stateMethodsMetadata, }, - "batchRequestsStateMethodsArchiveNode", true, }, { + "batchRequestsStateMethodsArchiveNodeWithMethodDisabled", args{ &archiveNodeConfigWithMethodDisabled, stateMethodsMetadata, }, - "batchRequestsStateMethodsArchiveNodeWithMethodDisabled", false, }, } From bd672bd9f4773ee577a9d676c2f620cb6be0f95c Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 16:08:41 -0700 Subject: [PATCH 04/21] fix: readability --- internal/route/node_filter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 61f2acc..f78d1db 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -10,7 +10,11 @@ import ( const DefaultMaxBlocksBehind = 10 type NodeFilter interface { - Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, numUpstreamsInPriorityGroup int) bool + Apply( + requestMetadata metadata.RequestMetadata, + upstreamConfig *config.UpstreamConfig, + numUpstreamsInPriorityGroup int, + ) bool } type AndFilter struct { From d5b839eb8f888f8e4eb2043471ccfaeefc037201 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:28:52 -0700 Subject: [PATCH 05/21] fix: update `README.md` --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 3e7c4de..9dc066a 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,16 @@ Running all tests that match regexp `TestHealthCheckManager`: go test -v -run TestHealthCheckManager ./... ``` +Running all tests in a specific package that match regexp `TestHealthCheckManager`: +```sh +go test -v github.com/satsuma-data/node-gateway/internal/route -run 'Test_RemoveFilters*' +``` + +Running a specific test: +```sh +go test -v github.com/satsuma-data/node-gateway/internal/route -run Test_RemoveFilters_RemoveNone +``` + To measure test code coverage, install the following tool and run the above `go test` command with the `-cover` flag: ```sh go get golang.org/x/tools/cmd/cover From 1d92136c13c2d85fc080ff8876530a71b17c9f61 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:29:28 -0700 Subject: [PATCH 06/21] feat: initial implementation of `AlwaysRouteRoutingStrategy` --- .../route/always_route_filtering_strategy.go | 132 ++++++++++++++++++ .../always_route_filtering_strategy_test.go | 83 +++++++++++ 2 files changed, 215 insertions(+) create mode 100644 internal/route/always_route_filtering_strategy.go create mode 100644 internal/route/always_route_filtering_strategy_test.go diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go new file mode 100644 index 0000000..7e45f70 --- /dev/null +++ b/internal/route/always_route_filtering_strategy.go @@ -0,0 +1,132 @@ +package route + +import ( + "reflect" + "strings" + + "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metadata" + "github.com/satsuma-data/node-gateway/internal/types" + "go.uber.org/zap" +) + +type AlwaysRouteRoutingStrategy struct { + BackingStrategy RoutingStrategy + Logger *zap.Logger + NodeFilters []NodeFilter + RemovableFilters []NodeFilterType +} + +func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, +) (string, error) { + // Create a copy of removable filters to avoid modifying the original slice. + removableFilters := make([]NodeFilterType, len(s.RemovableFilters)) + copy(removableFilters, s.RemovableFilters) + + filters := s.NodeFilters // WARNING: Slice assignment, not a copy! + for len(filters) > 0 { + // Get all healthy upstreams for the current set of filters. + upstreams := s.FilterUpstreams(upstreamsByPriority, requestMetadata, filters) + + // If there is at least one healthy upstream, route using the backing strategy. + if len(upstreams) > 0 { + return s.BackingStrategy.RouteNextRequest(upstreams, requestMetadata) + } + + // If there are no more filters to remove, and there are no healthy upstreams + // found, so give up. + if len(removableFilters) == 0 { + break + } + + // Remove the next filter and try again. + idx := len(removableFilters) - 1 + nextFilterToRemove := removableFilters[idx] + removableFilters = removableFilters[:idx] + filters = removeFilters(filters, nextFilterToRemove) + } + + // If all removable filters are exhausted and no healthy upstreams are found, + // pass in the original list of upstreams to the backing strategy. + return s.BackingStrategy.RouteNextRequest(upstreamsByPriority, requestMetadata) +} + +// FilterUpstreams filters upstreams based on the provided NodeFilter. +// WARNING: If a given priority does not have any healthy upstreams, +// +// it will not be included in the returned map. +func (s *AlwaysRouteRoutingStrategy) FilterUpstreams( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, + nodeFilters []NodeFilter, +) types.PriorityToUpstreamsMap { + priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap) + + // Iterate over each priority and filter the associated upstreams. + for priority, upstreamConfigs := range upstreamsByPriority { + s.Logger.Debug( + "Determining healthy upstreams at priority.", + zap.Int("priority", priority), + zap.Any("upstreams", upstreamConfigs), + ) + + filteredUpstreams := make([]*config.UpstreamConfig, 0) + + // Iterate over each upstream and apply the filters. + for _, upstreamConfig := range upstreamConfigs { + pass := true + for _, nodeFilter := range nodeFilters { + pass = nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) + if !pass { + break + } + } + + if pass { + filteredUpstreams = append(filteredUpstreams, upstreamConfig) + } + } + + // Only add the priority to the map if there is at least one healthy upstream. + if len(filteredUpstreams) > 0 { + priorityToHealthyUpstreams[priority] = filteredUpstreams + } + } + + return priorityToHealthyUpstreams +} + +// removeFilters returns a new slice of filters with the given filter type removed. +func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFilter { + retFilters := make([]NodeFilter, 0) + + for _, filter := range filters { + if getFilterTypeName(filter) != filterToRemove { + retFilters = append(retFilters, filter) + } + } + + return retFilters +} + +func getFilterTypeName(v interface{}) NodeFilterType { + t := reflect.TypeOf(v) + + // If it's a pointer, get the element type. + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + // Extract the name of the type and remove the package path. + typeName := t.String() + lastDotIndex := strings.LastIndex(typeName, ".") + + if lastDotIndex != -1 { + // Remove the package path, keep only the type name. + typeName = typeName[lastDotIndex+1:] + } + + return NodeFilterType(typeName) +} diff --git a/internal/route/always_route_filtering_strategy_test.go b/internal/route/always_route_filtering_strategy_test.go new file mode 100644 index 0000000..5756959 --- /dev/null +++ b/internal/route/always_route_filtering_strategy_test.go @@ -0,0 +1,83 @@ +package route + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_getFilterTypeName(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + NodeFilterType("AlwaysPass"), + getFilterTypeName(AlwaysPass{}), + ) + Assert.Equal( + NodeFilterType("AlwaysFail"), + getFilterTypeName(AlwaysFail{}), + ) + Assert.Equal( + NodeFilterType("AndFilter"), + getFilterTypeName(&AndFilter{}), + ) +} + +func Test_RemoveFilters_RemoveNone(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysFail{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}, &AndFilter{}}, + removeFilters([]NodeFilter{AlwaysFail{}, &AndFilter{}}, "AlwaysPass"), + ) +} + +func Test_RemoveFilters_RemoveOne(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysFail{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{&AndFilter{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysPass{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}}, "AndFilter"), + ) +} + +func Test_RemoveFilters_RemoveTwo(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + []NodeFilter{}, + removeFilters([]NodeFilter{AlwaysPass{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{&AndFilter{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}, AlwaysPass{}}, "AlwaysPass"), + ) + Assert.Equal( + []NodeFilter{AlwaysPass{}}, + removeFilters([]NodeFilter{AlwaysPass{}, &AndFilter{}, &AndFilter{}}, "AndFilter"), + ) + Assert.Equal( + []NodeFilter{AlwaysFail{}}, + removeFilters([]NodeFilter{AlwaysPass{}, AlwaysPass{}, AlwaysFail{}}, "AlwaysPass"), + ) +} From 1193c149c81647a9574b7c9f5081ee7e1f6f8d3d Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:34:37 -0700 Subject: [PATCH 07/21] fix: comment formatting --- internal/route/always_route_filtering_strategy.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 7e45f70..9de3d28 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -55,8 +55,7 @@ func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( // FilterUpstreams filters upstreams based on the provided NodeFilter. // WARNING: If a given priority does not have any healthy upstreams, -// -// it will not be included in the returned map. +// it will not be included in the returned map. func (s *AlwaysRouteRoutingStrategy) FilterUpstreams( upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata, From 6ecd34e15ea241b3d332f261d5da70cf10b97d56 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:36:14 -0700 Subject: [PATCH 08/21] fix: comment phrasing --- internal/route/always_route_filtering_strategy.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 9de3d28..dda56bc 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -35,8 +35,7 @@ func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( return s.BackingStrategy.RouteNextRequest(upstreams, requestMetadata) } - // If there are no more filters to remove, and there are no healthy upstreams - // found, so give up. + // There are no more filters to remove and no healthy upstreams found, so give up. if len(removableFilters) == 0 { break } From 866fc13746efcdcad47b80f7c22595b8b7bf05fd Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:41:27 -0700 Subject: [PATCH 09/21] fix: add a TODO --- internal/route/always_route_filtering_strategy.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index dda56bc..92f4c8b 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -49,6 +49,9 @@ func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( // If all removable filters are exhausted and no healthy upstreams are found, // pass in the original list of upstreams to the backing strategy. + // + // TODO(polsar): Eventually, we want all filters to be removable. Once that is the case, + // we should not be able to get here. return s.BackingStrategy.RouteNextRequest(upstreamsByPriority, requestMetadata) } From 1cf33a62c872c5db231daa74eb287b6716ffdd31 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 19:46:18 -0700 Subject: [PATCH 10/21] fix: renaming --- internal/route/always_route_filtering_strategy.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 92f4c8b..c9a94e8 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -10,14 +10,14 @@ import ( "go.uber.org/zap" ) -type AlwaysRouteRoutingStrategy struct { +type AlwaysRouteFilteringStrategy struct { BackingStrategy RoutingStrategy Logger *zap.Logger NodeFilters []NodeFilter RemovableFilters []NodeFilterType } -func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( +func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata, ) (string, error) { @@ -58,7 +58,7 @@ func (s *AlwaysRouteRoutingStrategy) RouteNextRequest( // FilterUpstreams filters upstreams based on the provided NodeFilter. // WARNING: If a given priority does not have any healthy upstreams, // it will not be included in the returned map. -func (s *AlwaysRouteRoutingStrategy) FilterUpstreams( +func (s *AlwaysRouteFilteringStrategy) FilterUpstreams( upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata, nodeFilters []NodeFilter, From 017c250771ad5a770cae6cc305bf60b9be7699fd Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 20:11:59 -0700 Subject: [PATCH 11/21] fix: revert to the previous version of `PriorityRoundRobinStrategy` --- internal/route/routing_strategy.go | 135 ++--------------------------- 1 file changed, 8 insertions(+), 127 deletions(-) diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index df43d41..8fa2b49 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -1,11 +1,9 @@ package route import ( - "slices" "sort" "sync/atomic" - conf "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "go.uber.org/zap" @@ -20,18 +18,15 @@ type RoutingStrategy interface { requestMetadata metadata.RequestMetadata, ) (string, error) } - type PriorityRoundRobinStrategy struct { - logger *zap.Logger - counter uint64 - alwaysRoute bool + logger *zap.Logger + counter uint64 } -func NewPriorityRoundRobinStrategy(logger *zap.Logger, alwaysRoute bool) *PriorityRoundRobinStrategy { +func NewPriorityRoundRobinStrategy(logger *zap.Logger) *PriorityRoundRobinStrategy { return &PriorityRoundRobinStrategy{ - logger: logger, - counter: 0, - alwaysRoute: alwaysRoute, + logger: logger, + counter: 0, } } @@ -49,134 +44,20 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( upstreamsByPriority types.PriorityToUpstreamsMap, _ metadata.RequestMetadata, ) (string, error) { - statusToUpstreamsByPriority := partitionUpstreams(upstreamsByPriority) - - var healthyUpstreamsByPriority types.PriorityToUpstreamsMap - - var exists bool - - if healthyUpstreamsByPriority, exists = statusToUpstreamsByPriority[conf.ReasonUnknownOrHealthy]; !exists { - // There are no healthy upstreams. - healthyUpstreamsByPriority = make(types.PriorityToUpstreamsMap) - } - - prioritySorted := maps.Keys(healthyUpstreamsByPriority) + prioritySorted := maps.Keys(upstreamsByPriority) sort.Ints(prioritySorted) - // Note that `prioritySorted` can be empty, in which case the body of this loop will not be executed even once. for _, priority := range prioritySorted { - upstreams := healthyUpstreamsByPriority[priority] + upstreams := upstreamsByPriority[priority] if len(upstreams) > 0 { atomic.AddUint64(&s.counter, 1) - return upstreams[int(s.counter)%len(upstreams)].ID, nil //nolint:nolintlint,gosec // Legacy + return upstreams[int(s.counter)%len(upstreams)].ID, nil } s.logger.Debug("Did not find any healthy nodes in priority.", zap.Int("priority", priority)) } - // No healthy upstreams are available. If `alwaysRoute` is true, find an unhealthy upstream to route to anyway. - // TODO(polsar): At this time, the only unhealthy upstreams that can end up here are those due to high latency - // or error rate. Pass ALL configured upstreams in `upstreamsByPriority`. Their health status should be indicated - // in the UpstreamConfig.HealthStatus field. - if s.alwaysRoute { - s.logger.Warn("No healthy upstreams found but `alwaysRoute` is set to true.") - - // If available, return an upstream that's unhealthy due to high latency rate. - if upstreamsByPriorityLatencyUnhealthy, ok := statusToUpstreamsByPriority[conf.ReasonLatencyTooHighRate]; ok { - upstream := getHighestPriorityUpstream(upstreamsByPriorityLatencyUnhealthy) - if upstream == nil { - // This indicates a non-recoverable bug in the code. - panic("Upstream not found!") - } - - s.logger.Info( - "Routing to an upstream with high latency.", - zap.String("ID", upstream.ID), - zap.String("GroupID", upstream.GroupID), - zap.String("HTTPURL", upstream.HTTPURL), - zap.String("WSURL", upstream.WSURL), - ) - - return upstream.ID, nil - } - - // If available, return an upstream that's unhealthy due to high error rate. - if upstreamsByPriorityErrorUnhealthy, ok := statusToUpstreamsByPriority[conf.ReasonErrorRate]; ok { - upstream := getHighestPriorityUpstream(upstreamsByPriorityErrorUnhealthy) - if upstream == nil { - // This indicates a non-recoverable bug in the code. - panic("Upstream not found!") - } - - s.logger.Info( - "Routing to an upstream with high error rate.", - zap.String("ID", upstream.ID), - zap.String("GroupID", upstream.GroupID), - zap.String("HTTPURL", upstream.HTTPURL), - zap.String("WSURL", upstream.WSURL), - ) - - return upstream.ID, nil - } - - // TODO(polsar): If we get here, that means all upstreams are unhealthy, but they are all unhealthy - // due to a reason other than high latency or error rate. We should still be able to route to one of those. - // Asana task: https://app.asana.com/0/1207397277805097/1208186611173034/f - s.logger.Error("All upstreams are unhealthy due to reasons other than high latency or error rate.") - } - - // TODO(polsar): (Once the task above is complete.) If `alwaysRoute` is true, the only way we can get here is if - // there are no upstreams in `upstreamsByPriority`. This shouldn't be possible, so we should log a critical error. return "", DefaultNoHealthyUpstreamsError } - -// Partitions the given upstreams by their health status. -func partitionUpstreams(upstreamsByPriority types.PriorityToUpstreamsMap) map[conf.UnhealthyReason]types.PriorityToUpstreamsMap { - statusToUpstreamsByPriority := make(map[conf.UnhealthyReason]types.PriorityToUpstreamsMap) - - for priority, upstreams := range upstreamsByPriority { - for _, upstream := range upstreams { - status := upstream.HealthStatus - - if upstreamsByPriorityForStatus, statusExists := statusToUpstreamsByPriority[status]; statusExists { - // The priority-to-upstreams map exists for the status. - if upstreamsForStatusAndPriority, priorityExists := upstreamsByPriorityForStatus[priority]; priorityExists { - // The upstreams slice exists for the status and priority, so append to it. - upstreamsByPriorityForStatus[priority] = append(upstreamsForStatusAndPriority, upstream) - } else { - // The upstreams slice does not exist for the status and priority, so create it. - upstreamsByPriorityForStatus[priority] = []*conf.UpstreamConfig{upstream} - } - } else { - // The priority-to-upstreams map does not exist for the status, so create it. - statusToUpstreamsByPriority[status] = types.PriorityToUpstreamsMap{ - priority: []*conf.UpstreamConfig{upstream}, - } - } - } - } - - return statusToUpstreamsByPriority -} - -// Returns the first upstream with the highest priority in the given map. Note the in our case the highest priority -// corresponds to the lowest int value. -func getHighestPriorityUpstream(upstreamsByPriority types.PriorityToUpstreamsMap) *conf.UpstreamConfig { - priorities := maps.Keys(upstreamsByPriority) - - if len(priorities) == 0 { - return nil - } - - maxPriority := slices.Min(priorities) - upstreams := upstreamsByPriority[maxPriority] - - if len(upstreams) == 0 { - // If a priority is a key in the passed map, there must be at least one upstream for it. - panic("No upstreams found!") - } - - return upstreams[0] -} From 0059f34f2ea0e40bdcffd7671fe6a8e3ed772502 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 20:40:25 -0700 Subject: [PATCH 12/21] fix: remove `HealthStatus` field --- internal/checks/errorlatency.go | 10 ++++----- internal/config/config.go | 13 ----------- internal/mocks/ErrorLatencyChecker.go | 32 +++++++++++++-------------- internal/route/node_filter.go | 13 +---------- internal/types/types.go | 2 +- 5 files changed, 22 insertions(+), 48 deletions(-) diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index 44aa139..4aa2a6d 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -310,9 +310,9 @@ func (c *ErrorLatencyCheck) runPassiveCheckForMethod(method string, latencyThres c.logger.Debug("Ran passive ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Any("latency", duration), zap.Error(c.Err)) } -func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyReason { +func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { if !c.routingConfig.IsEnhancedRoutingControlDefined() { - return conf.ReasonUnknownOrHealthy + return true } if c.errorCircuitBreaker.IsOpen() { @@ -322,7 +322,7 @@ func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyR zap.Error(c.Err), ) - return conf.ReasonErrorRate + return false } c.lock.Lock() @@ -343,11 +343,11 @@ func (c *ErrorLatencyCheck) GetUnhealthyReason(methods []string) conf.UnhealthyR zap.Error(c.Err), ) - return conf.ReasonLatencyTooHighRate + return false } } - return conf.ReasonUnknownOrHealthy + return true } func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { diff --git a/internal/config/config.go b/internal/config/config.go index 0591ab7..98d351a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -33,18 +33,6 @@ const ( PassiveLatencyChecking = false ) -// UnhealthyReason is the reason why a health check failed. We use it to select the most appropriate upstream to route to -// if all upstreams are unhealthy and the `alwaysRoute` option is true. -type UnhealthyReason int - -const ( - ReasonUnknownOrHealthy = iota - ReasonErrorRate - ReasonLatencyTooHighRate -) - -// UpstreamConfig -// TODO(polsar): Move the HealthStatus field to a new struct and embed this struct in it. Asana task: https://app.asana.com/0/1207397277805097/1208232039997185/f type UpstreamConfig struct { Methods MethodsConfig `yaml:"methods"` HealthCheckConfig HealthCheckConfig `yaml:"healthCheck"` @@ -55,7 +43,6 @@ type UpstreamConfig struct { GroupID string `yaml:"group"` NodeType NodeType `yaml:"nodeType"` RequestHeadersConfig []RequestHeaderConfig `yaml:"requestHeaders"` - HealthStatus UnhealthyReason // The default value of this field is 0 (ReasonUnknownOrHealthy). } func (c *UpstreamConfig) isValid(groups []GroupConfig) bool { diff --git a/internal/mocks/ErrorLatencyChecker.go b/internal/mocks/ErrorLatencyChecker.go index 312395b..46df736 100644 --- a/internal/mocks/ErrorLatencyChecker.go +++ b/internal/mocks/ErrorLatencyChecker.go @@ -3,10 +3,8 @@ package mocks import ( - config "github.com/satsuma-data/node-gateway/internal/config" - mock "github.com/stretchr/testify/mock" - types "github.com/satsuma-data/node-gateway/internal/types" + mock "github.com/stretchr/testify/mock" ) // ErrorLatencyChecker is an autogenerated mock type for the ErrorLatencyChecker type @@ -22,48 +20,48 @@ func (_m *ErrorLatencyChecker) EXPECT() *ErrorLatencyChecker_Expecter { return &ErrorLatencyChecker_Expecter{mock: &_m.Mock} } -// GetUnhealthyReason provides a mock function with given fields: methods -func (_m *ErrorLatencyChecker) GetUnhealthyReason(methods []string) config.UnhealthyReason { +// IsPassing provides a mock function with given fields: methods +func (_m *ErrorLatencyChecker) IsPassing(methods []string) bool { ret := _m.Called(methods) if len(ret) == 0 { - panic("no return value specified for GetUnhealthyReason") + panic("no return value specified for IsPassing") } - var r0 config.UnhealthyReason - if rf, ok := ret.Get(0).(func([]string) config.UnhealthyReason); ok { + var r0 bool + if rf, ok := ret.Get(0).(func([]string) bool); ok { r0 = rf(methods) } else { - r0 = ret.Get(0).(config.UnhealthyReason) + r0 = ret.Get(0).(bool) } return r0 } -// ErrorLatencyChecker_GetUnhealthyReason_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetUnhealthyReason' -type ErrorLatencyChecker_GetUnhealthyReason_Call struct { +// ErrorLatencyChecker_IsPassing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsPassing' +type ErrorLatencyChecker_IsPassing_Call struct { *mock.Call } -// GetUnhealthyReason is a helper method to define mock.On call +// IsPassing is a helper method to define mock.On call // - methods []string -func (_e *ErrorLatencyChecker_Expecter) GetUnhealthyReason(methods interface{}) *ErrorLatencyChecker_GetUnhealthyReason_Call { - return &ErrorLatencyChecker_GetUnhealthyReason_Call{Call: _e.mock.On("GetUnhealthyReason", methods)} +func (_e *ErrorLatencyChecker_Expecter) IsPassing(methods interface{}) *ErrorLatencyChecker_IsPassing_Call { + return &ErrorLatencyChecker_IsPassing_Call{Call: _e.mock.On("IsPassing", methods)} } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) Run(run func(methods []string)) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) Run(run func(methods []string)) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].([]string)) }) return _c } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) Return(_a0 config.UnhealthyReason) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) Return(_a0 bool) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Return(_a0) return _c } -func (_c *ErrorLatencyChecker_GetUnhealthyReason_Call) RunAndReturn(run func([]string) config.UnhealthyReason) *ErrorLatencyChecker_GetUnhealthyReason_Call { +func (_c *ErrorLatencyChecker_IsPassing_Call) RunAndReturn(run func([]string) bool) *ErrorLatencyChecker_IsPassing_Call { _c.Call.Return(run) return _c } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index f78d1db..43f78ce 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -121,18 +121,7 @@ func (f *IsLatencyAcceptable) Apply(requestMetadata metadata.RequestMetadata, up latencyCheck, _ := upstreamStatus.LatencyCheck.(*checks.ErrorLatencyCheck) - // TODO(polsar): If unhealthy, set the delta by which the check failed. - // For example, if the configured rate is 0.25 and the current error rate is 0.27, - // the delta is 0.27 - 0.25 = 0.02. This value can be used to rank upstreams by the - // degree to which they are unhealthy. This would help us choose the upstream to - // route to if all upstreams are unhealthy AND `alwaysRoute` config option is true. - upstreamConfig.HealthStatus = latencyCheck.GetUnhealthyReason(requestMetadata.Methods) - - // TODO(polsar): Note that for ErrorLatencyCheck only, we always return true. The health status - // of the upstream is instead contained in the struct's HealthStatus field. This is a bit - // clunky. Eventually, we want to change the signature of the Apply method. This will require - // significant refactoring. - return true + return latencyCheck.IsPassing(requestMetadata.Methods) } type IsCloseToGlobalMaxHeight struct { diff --git a/internal/types/types.go b/internal/types/types.go index 466b8a3..07635c5 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -41,7 +41,7 @@ type Checker interface { //go:generate mockery --output ../mocks --name ErrorLatencyChecker --with-expecter type ErrorLatencyChecker interface { RunPassiveCheck() - GetUnhealthyReason(methods []string) config.UnhealthyReason + IsPassing(methods []string) bool RecordRequest(data *RequestData) } From a677f465a69ef62c1ee60f45c9c7361522850ade Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 21:07:44 -0700 Subject: [PATCH 13/21] feat: LatencyChecker can now be configured to only check latency, errors, or both --- internal/checks/errorlatency.go | 62 ++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index 4aa2a6d..9e5cff2 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -129,6 +129,10 @@ func NewCircuitBreaker( Build() } +// ErrorLatencyCheck +// Error checking is disabled if `errorCircuitBreaker` is nil. +// Latency checking is disabled if `methodLatencyBreaker` is nil. +// At least one of these two must be non-nil. type ErrorLatencyCheck struct { client client.EthClient Err error @@ -149,18 +153,40 @@ func NewErrorLatencyChecker( clientGetter client.EthClientGetter, metricsContainer *metrics.Container, logger *zap.Logger, + enableErrorChecking bool, + enableLatencyChecking bool, ) types.ErrorLatencyChecker { + if !enableErrorChecking && !enableLatencyChecking { + panic("ErrorLatencyCheck must have at least one of error or latency checking enabled.") + } + + // Create the error circuit breaker if error checking is enabled. + var errorCircuitBreaker ErrorCircuitBreaker + if enableErrorChecking { + errorCircuitBreaker = NewErrorStats(routingConfig) + } + + // Create the latency circuit breaker if latency checking is enabled. + var latencyCircuitBreaker map[string]LatencyCircuitBreaker + if enableLatencyChecking { + latencyCircuitBreaker = make(map[string]LatencyCircuitBreaker) + } + c := &ErrorLatencyCheck{ upstreamConfig: upstreamConfig, routingConfig: routingConfig, clientGetter: clientGetter, metricsContainer: metricsContainer, logger: logger, - errorCircuitBreaker: NewErrorStats(routingConfig), - methodLatencyBreaker: make(map[string]LatencyCircuitBreaker), + errorCircuitBreaker: errorCircuitBreaker, + methodLatencyBreaker: latencyCircuitBreaker, ShouldRunPassiveHealthChecks: routingConfig.PassiveLatencyChecking && (routingConfig.Errors != nil || routingConfig.Latency != nil), } + if c.ShouldRunPassiveHealthChecks && !(enableErrorChecking && enableLatencyChecking) { + panic("ErrorLatencyCheck must have both error and latency checking enabled for passive health checks.") + } + if err := c.InitializePassiveCheck(); err != nil { logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig), zap.Error(err)) } @@ -315,7 +341,7 @@ func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { return true } - if c.errorCircuitBreaker.IsOpen() { + if c.errorCircuitBreaker != nil && c.errorCircuitBreaker.IsOpen() { c.logger.Debug( "ErrorLatencyCheck is not passing due to too many errors.", zap.String("upstreamID", c.upstreamConfig.ID), @@ -328,6 +354,10 @@ func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { c.lock.Lock() defer c.lock.Unlock() + if c.methodLatencyBreaker == nil { + return true + } + // Only consider the passed methods, even if other methods' circuit breakers might be open. // // TODO(polsar): If the circuit breaker for any of the passed methods is open, we consider this upstream @@ -359,16 +389,24 @@ func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { return } - latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) - latencyCircuitBreaker.RecordLatency(data.Latency) + // Record the request latency if latency checking is enabled. + if c.methodLatencyBreaker != nil { + latencyCircuitBreaker := c.getLatencyCircuitBreaker(data.Method) + latencyCircuitBreaker.RecordLatency(data.Latency) - if data.Latency >= latencyCircuitBreaker.GetThreshold() { - c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - data.Method, - ).Inc() + if data.Latency >= latencyCircuitBreaker.GetThreshold() { + c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( + c.upstreamConfig.ID, + c.upstreamConfig.HTTPURL, + metrics.HTTPRequest, + data.Method, + ).Inc() + } + } + + // If error checking is disabled, we can return early. + if c.errorCircuitBreaker == nil { + return } errorString := "" From 90846f655ba296d35e5a8d1872c55a0555d124c3 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 21:18:55 -0700 Subject: [PATCH 14/21] feat: create separate instances of LatencyChecker for error and latency checking --- internal/checks/errorlatency.go | 36 +++++++++++++++++++++++++++++++++ internal/checks/manager.go | 27 ++++++++++++++++++++++++- internal/types/types.go | 1 + 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index 9e5cff2..30d75f9 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -147,6 +147,42 @@ type ErrorLatencyCheck struct { ShouldRunPassiveHealthChecks bool } +func NewErrorChecker( + upstreamConfig *conf.UpstreamConfig, + routingConfig *conf.RoutingConfig, + clientGetter client.EthClientGetter, + metricsContainer *metrics.Container, + logger *zap.Logger, +) types.ErrorLatencyChecker { + return NewErrorLatencyChecker( + upstreamConfig, + routingConfig, + clientGetter, + metricsContainer, + logger, + true, + false, + ) +} + +func NewLatencyChecker( + upstreamConfig *conf.UpstreamConfig, + routingConfig *conf.RoutingConfig, + clientGetter client.EthClientGetter, + metricsContainer *metrics.Container, + logger *zap.Logger, +) types.ErrorLatencyChecker { + return NewErrorLatencyChecker( + upstreamConfig, + routingConfig, + clientGetter, + metricsContainer, + logger, + false, + true, + ) +} + func NewErrorLatencyChecker( upstreamConfig *conf.UpstreamConfig, routingConfig *conf.RoutingConfig, diff --git a/internal/checks/manager.go b/internal/checks/manager.go index c0c420f..513d5f4 100644 --- a/internal/checks/manager.go +++ b/internal/checks/manager.go @@ -56,6 +56,13 @@ type healthCheckManager struct { *metrics.Container, *zap.Logger, ) types.Checker + newErrorCheck func( + *conf.UpstreamConfig, + *conf.RoutingConfig, + client.EthClientGetter, + *metrics.Container, + *zap.Logger, + ) types.ErrorLatencyChecker newLatencyCheck func( *conf.UpstreamConfig, *conf.RoutingConfig, @@ -92,7 +99,8 @@ func NewHealthCheckManager( newBlockHeightCheck: NewBlockHeightChecker, newPeerCheck: NewPeerChecker, newSyncingCheck: NewSyncingChecker, - newLatencyCheck: NewErrorLatencyChecker, + newErrorCheck: NewErrorChecker, + newLatencyCheck: NewLatencyChecker, blockHeightObserver: blockHeightObserver, healthCheckTicker: healthCheckTicker, metricsContainer: metricsContainer, @@ -188,6 +196,22 @@ func (h *healthCheckManager) initializeChecks() { ) }() + var errorCheck types.ErrorLatencyChecker + + innerWG.Add(1) + + go func() { + defer innerWG.Done() + + errorCheck = h.newErrorCheck( + &config, + &h.routingConfig, + client.NewEthClient, + h.metricsContainer, + h.logger, + ) + }() + var latencyCheck types.ErrorLatencyChecker innerWG.Add(1) @@ -213,6 +237,7 @@ func (h *healthCheckManager) initializeChecks() { BlockHeightCheck: blockHeightCheck, PeerCheck: peerCheck, SyncingCheck: syncingCheck, + ErrorCheck: errorCheck, LatencyCheck: latencyCheck, }) mutex.Unlock() diff --git a/internal/types/types.go b/internal/types/types.go index 07635c5..0c52397 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -11,6 +11,7 @@ type UpstreamStatus struct { BlockHeightCheck BlockHeightChecker PeerCheck Checker SyncingCheck Checker + ErrorCheck ErrorLatencyChecker LatencyCheck ErrorLatencyChecker ID string GroupID string From 7e2ed8eb65177c426f7bc4be39885fb9d170b325 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 21:56:18 -0700 Subject: [PATCH 15/21] feat: plumbing for `AlwaysRouteFilteringStrategy` --- internal/checks/errorlatency.go | 4 +++ internal/checks/manager.go | 9 +++++ internal/route/node_filter.go | 29 +++++++++------ internal/route/routing_strategy.go | 2 +- internal/route/routing_strategy_test.go | 6 ++-- internal/server/object_graph.go | 47 ++++++++++++++++++------- 6 files changed, 71 insertions(+), 26 deletions(-) diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index 30d75f9..f217476 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -372,6 +372,8 @@ func (c *ErrorLatencyCheck) runPassiveCheckForMethod(method string, latencyThres c.logger.Debug("Ran passive ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Any("latency", duration), zap.Error(c.Err)) } +// IsPassing +// TODO(polsar): Split this method into two separate methods: IsPassingError and IsPassingLatency. func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { if !c.routingConfig.IsEnhancedRoutingControlDefined() { return true @@ -416,6 +418,8 @@ func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { return true } +// RecordRequest +// TODO(polsar): Split this method into two separate methods: RecordError and RecordLatency. func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { if c.routingConfig.PassiveLatencyChecking { return diff --git a/internal/checks/manager.go b/internal/checks/manager.go index 513d5f4..2c06e4f 100644 --- a/internal/checks/manager.go +++ b/internal/checks/manager.go @@ -126,7 +126,16 @@ func (h *healthCheckManager) GetUpstreamStatus(upstreamID string) *types.Upstrea panic(fmt.Sprintf("Upstream ID %s not found!", upstreamID)) } +func (h *healthCheckManager) GetErrorCheck(upstreamID string) types.ErrorLatencyChecker { + return h.GetUpstreamStatus(upstreamID).ErrorCheck +} + +func (h *healthCheckManager) GetLatencyCheck(upstreamID string) types.ErrorLatencyChecker { + return h.GetUpstreamStatus(upstreamID).LatencyCheck +} + func (h *healthCheckManager) RecordRequest(upstreamID string, data *types.RequestData) { + h.GetUpstreamStatus(upstreamID).ErrorCheck.RecordRequest(data) h.GetUpstreamStatus(upstreamID).LatencyCheck.RecordRequest(data) } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 43f78ce..e2b207e 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -111,14 +111,23 @@ func (f *IsDoneSyncing) Apply(_ metadata.RequestMetadata, upstreamConfig *config return true } +type IsErrorRateAcceptable struct { + HealthCheckManager checks.HealthCheckManager +} + +func (f *IsErrorRateAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { + upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) + errorCheck, _ := upstreamStatus.ErrorCheck.(*checks.ErrorLatencyCheck) + + return errorCheck.IsPassing(requestMetadata.Methods) +} + type IsLatencyAcceptable struct { - healthCheckManager checks.HealthCheckManager - logger *zap.Logger + HealthCheckManager checks.HealthCheckManager } func (f *IsLatencyAcceptable) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { - upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) - + upstreamStatus := f.HealthCheckManager.GetUpstreamStatus(upstreamConfig.ID) latencyCheck, _ := upstreamStatus.LatencyCheck.(*checks.ErrorLatencyCheck) return latencyCheck.IsPassing(requestMetadata.Methods) @@ -284,15 +293,9 @@ func CreateSingleNodeFilter( minimumPeerCount: checks.MinimumPeerCount, } - isLatencyAcceptable := IsLatencyAcceptable{ - healthCheckManager: manager, - logger: logger, - } - return &AndFilter{ filters: []NodeFilter{ &hasEnoughPeers, - &isLatencyAcceptable, }, logger: logger, } @@ -314,6 +317,10 @@ func CreateSingleNodeFilter( } case MethodsAllowed: return &AreMethodsAllowed{logger: logger} + case ErrorRateAcceptable: + panic("ErrorRateAcceptable filter is not implemented!") + case LatencyAcceptable: + panic("LatencyAcceptable filter is not implemented!") default: panic("Unknown filter type " + filterName + "!") } @@ -326,4 +333,6 @@ const ( NearGlobalMaxHeight NodeFilterType = "nearGlobalMaxHeight" MaxHeightForGroup NodeFilterType = "maxHeightForGroup" MethodsAllowed NodeFilterType = "methodsAllowed" + ErrorRateAcceptable NodeFilterType = "errorRateAcceptable" + LatencyAcceptable NodeFilterType = "latencyAcceptable" ) diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index 8fa2b49..1a8a443 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -53,7 +53,7 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( if len(upstreams) > 0 { atomic.AddUint64(&s.counter, 1) - return upstreams[int(s.counter)%len(upstreams)].ID, nil + return upstreams[int(s.counter)%len(upstreams)].ID, nil //nolint:nolintlint,gosec // Legacy } s.logger.Debug("Did not find any healthy nodes in priority.", zap.Int("priority", priority)) diff --git a/internal/route/routing_strategy_test.go b/internal/route/routing_strategy_test.go index a3db870..96b7025 100644 --- a/internal/route/routing_strategy_test.go +++ b/internal/route/routing_strategy_test.go @@ -17,7 +17,7 @@ func TestPriorityStrategy_HighPriority(t *testing.T) { 1: {cfg("erigon")}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) @@ -40,7 +40,7 @@ func TestPriorityStrategy_LowerPriority(t *testing.T) { 1: {cfg("fallback1"), cfg("fallback2")}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) @@ -57,7 +57,7 @@ func TestPriorityStrategy_NoUpstreams(t *testing.T) { 1: {}, } - strategy := NewPriorityRoundRobinStrategy(zap.L(), false) + strategy := NewPriorityRoundRobinStrategy(zap.L()) for i := 0; i < 10; i++ { upstreamID, err := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index 9839362..b53fd1e 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -28,7 +28,7 @@ type singleChainObjectGraph struct { } func wireSingleChainDependencies( - globalConfig config.GlobalConfig, //nolint:gocritic // Legacy + globalConfig *config.GlobalConfig, chainConfig *config.SingleChainConfig, logger *zap.Logger, rpcCache *cache.RPCCache, @@ -47,11 +47,6 @@ func wireSingleChainDependencies( logger, ) - alwaysRoute := false - if chainConfig.Routing.AlwaysRoute != nil { - alwaysRoute = *chainConfig.Routing.AlwaysRoute - } - enabledNodeFilters := []route.NodeFilterType{ route.Healthy, route.MaxHeightForGroup, @@ -65,10 +60,38 @@ func wireSingleChainDependencies( logger, &chainConfig.Routing, ) - routingStrategy := route.FilteringRoutingStrategy{ - NodeFilter: nodeFilter, - BackingStrategy: route.NewPriorityRoundRobinStrategy(logger, alwaysRoute), - Logger: logger, + + // Determine if we should always route even if no healthy upstreams are available. + alwaysRoute := false + if chainConfig.Routing.AlwaysRoute != nil { + alwaysRoute = *chainConfig.Routing.AlwaysRoute + } + + // If we should always route, use AlwaysRouteFilteringStrategy. Otherwise, use FilteringRoutingStrategy. + backingStrategy := route.NewPriorityRoundRobinStrategy(logger) + + var routingStrategy route.RoutingStrategy + + if alwaysRoute { + routingStrategy = &route.AlwaysRouteFilteringStrategy{ + NodeFilters: []route.NodeFilter{ + nodeFilter, + &route.IsErrorRateAcceptable{HealthCheckManager: healthCheckManager}, + &route.IsLatencyAcceptable{HealthCheckManager: healthCheckManager}, + }, + RemovableFilters: []route.NodeFilterType{ + route.ErrorRateAcceptable, + route.LatencyAcceptable, + }, + BackingStrategy: backingStrategy, + Logger: logger, + } + } else { + routingStrategy = &route.FilteringRoutingStrategy{ + NodeFilter: nodeFilter, + BackingStrategy: backingStrategy, + Logger: logger, + } } router := route.NewRouter( @@ -78,7 +101,7 @@ func wireSingleChainDependencies( chainConfig.Groups, chainMetadataStore, healthCheckManager, - &routingStrategy, + routingStrategy, metricContainer, logger, rpcCache, @@ -114,7 +137,7 @@ func WireDependenciesForAllChains( childLogger := rootLogger.With(zap.String("chainName", currentChainConfig.ChainName)) dependencyContainer := wireSingleChainDependencies( - gatewayConfig.Global, + &gatewayConfig.Global, currentChainConfig, childLogger, rpcCache, From f99e30d228741642d2bac8aeab750420b83ee5c0 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 22:00:19 -0700 Subject: [PATCH 16/21] fix: tests --- internal/checks/errorlatency_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/internal/checks/errorlatency_test.go b/internal/checks/errorlatency_test.go index 735250d..f9e8754 100644 --- a/internal/checks/errorlatency_test.go +++ b/internal/checks/errorlatency_test.go @@ -13,7 +13,7 @@ import ( "go.uber.org/zap" ) -func helperTestLatencyChecker(t *testing.T, latency1, latency2 time.Duration, reason config.UnhealthyReason) { +func helperTestLatencyChecker(t *testing.T, latency1, latency2 time.Duration, pass bool) { t.Helper() methods := []string{"eth_call", "eth_getLogs"} @@ -32,31 +32,33 @@ func helperTestLatencyChecker(t *testing.T, latency1, latency2 time.Duration, re mockEthClientGetter, metrics.NewContainer(config.TestChainName), zap.L(), + true, + true, ) - assert.Equal(t, reason, checker.GetUnhealthyReason(methods)) + assert.Equal(t, pass, checker.IsPassing(methods)) ethClient.AssertNumberOfCalls(t, "RecordLatency", 2) } func TestLatencyChecker_TwoMethods_BothLatenciesLessThanThreshold(t *testing.T) { - helperTestLatencyChecker(t, 2*time.Millisecond, 3*time.Millisecond, config.ReasonUnknownOrHealthy) + helperTestLatencyChecker(t, 2*time.Millisecond, 3*time.Millisecond, true) } func TestLatencyChecker_TwoMethods_BothLatenciesJustBelowThreshold(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, (2000-1)*time.Millisecond, config.ReasonUnknownOrHealthy) + helperTestLatencyChecker(t, (10000-1)*time.Millisecond, (2000-1)*time.Millisecond, true) } func TestLatencyChecker_TwoMethods_FirstLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10000*time.Millisecond, (2000-1)*time.Millisecond, config.ReasonLatencyTooHighRate) + helperTestLatencyChecker(t, 10000*time.Millisecond, (2000-1)*time.Millisecond, false) } func TestLatencyChecker_TwoMethods_SecondLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, 2000*time.Millisecond, config.ReasonLatencyTooHighRate) + helperTestLatencyChecker(t, (10000-1)*time.Millisecond, 2000*time.Millisecond, false) } func TestLatencyChecker_TwoMethods_BothLatenciesTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10002*time.Millisecond, 2003*time.Millisecond, config.ReasonLatencyTooHighRate) + helperTestLatencyChecker(t, 10002*time.Millisecond, 2003*time.Millisecond, false) } func Test_isMatchForPatterns_True(t *testing.T) { From bd234d060e0f3a166b1b8be856435cc37ace536f Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Thu, 3 Oct 2024 22:17:11 -0700 Subject: [PATCH 17/21] fix: removable filters --- internal/route/always_route_filtering_strategy.go | 4 ++-- .../route/always_route_filtering_strategy_test.go | 8 ++++---- internal/server/object_graph.go | 11 +++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index c9a94e8..1bf48d0 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -104,7 +104,7 @@ func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFi retFilters := make([]NodeFilter, 0) for _, filter := range filters { - if getFilterTypeName(filter) != filterToRemove { + if GetFilterTypeName(filter) != filterToRemove { retFilters = append(retFilters, filter) } } @@ -112,7 +112,7 @@ func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFi return retFilters } -func getFilterTypeName(v interface{}) NodeFilterType { +func GetFilterTypeName(v interface{}) NodeFilterType { t := reflect.TypeOf(v) // If it's a pointer, get the element type. diff --git a/internal/route/always_route_filtering_strategy_test.go b/internal/route/always_route_filtering_strategy_test.go index 5756959..12c27ff 100644 --- a/internal/route/always_route_filtering_strategy_test.go +++ b/internal/route/always_route_filtering_strategy_test.go @@ -6,20 +6,20 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_getFilterTypeName(t *testing.T) { +func Test_GetFilterTypeName(t *testing.T) { Assert := assert.New(t) Assert.Equal( NodeFilterType("AlwaysPass"), - getFilterTypeName(AlwaysPass{}), + GetFilterTypeName(AlwaysPass{}), ) Assert.Equal( NodeFilterType("AlwaysFail"), - getFilterTypeName(AlwaysFail{}), + GetFilterTypeName(AlwaysFail{}), ) Assert.Equal( NodeFilterType("AndFilter"), - getFilterTypeName(&AndFilter{}), + GetFilterTypeName(&AndFilter{}), ) } diff --git a/internal/server/object_graph.go b/internal/server/object_graph.go index b53fd1e..8b40b14 100644 --- a/internal/server/object_graph.go +++ b/internal/server/object_graph.go @@ -72,16 +72,19 @@ func wireSingleChainDependencies( var routingStrategy route.RoutingStrategy + errorFilter := route.IsErrorRateAcceptable{HealthCheckManager: healthCheckManager} + latencyFilter := route.IsLatencyAcceptable{HealthCheckManager: healthCheckManager} + if alwaysRoute { routingStrategy = &route.AlwaysRouteFilteringStrategy{ NodeFilters: []route.NodeFilter{ nodeFilter, - &route.IsErrorRateAcceptable{HealthCheckManager: healthCheckManager}, - &route.IsLatencyAcceptable{HealthCheckManager: healthCheckManager}, + &errorFilter, + &latencyFilter, }, RemovableFilters: []route.NodeFilterType{ - route.ErrorRateAcceptable, - route.LatencyAcceptable, + route.GetFilterTypeName(errorFilter), + route.GetFilterTypeName(latencyFilter), }, BackingStrategy: backingStrategy, Logger: logger, From 7ccf798f5e1bb3fc17980ed99fdc255a50468212 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Fri, 4 Oct 2024 17:10:08 -0700 Subject: [PATCH 18/21] feat: add debug logging --- internal/route/always_route_filtering_strategy.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 1bf48d0..d141288 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -45,6 +45,12 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( nextFilterToRemove := removableFilters[idx] removableFilters = removableFilters[:idx] filters = removeFilters(filters, nextFilterToRemove) + s.Logger.Debug( + "Removed a filter.", + zap.Any("upstreamsByPriority", upstreamsByPriority), + zap.Any("removedFilter", nextFilterToRemove), + zap.Any("remainingFilters", filters), + ) } // If all removable filters are exhausted and no healthy upstreams are found, From fe9de045c3bb83b2880a1c205317781b84abf534 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Fri, 4 Oct 2024 18:31:30 -0700 Subject: [PATCH 19/21] fix: move `getFilterTypeName` function to `node_filter` module --- .../route/always_route_filtering_strategy.go | 23 ------------------- .../always_route_filtering_strategy_test.go | 17 -------------- internal/route/node_filter.go | 23 +++++++++++++++++++ internal/route/node_filter_test.go | 17 ++++++++++++++ 4 files changed, 40 insertions(+), 40 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index c9a94e8..5d3a90b 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -1,9 +1,6 @@ package route import ( - "reflect" - "strings" - "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" @@ -111,23 +108,3 @@ func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFi return retFilters } - -func getFilterTypeName(v interface{}) NodeFilterType { - t := reflect.TypeOf(v) - - // If it's a pointer, get the element type. - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - - // Extract the name of the type and remove the package path. - typeName := t.String() - lastDotIndex := strings.LastIndex(typeName, ".") - - if lastDotIndex != -1 { - // Remove the package path, keep only the type name. - typeName = typeName[lastDotIndex+1:] - } - - return NodeFilterType(typeName) -} diff --git a/internal/route/always_route_filtering_strategy_test.go b/internal/route/always_route_filtering_strategy_test.go index 5756959..1e20efd 100644 --- a/internal/route/always_route_filtering_strategy_test.go +++ b/internal/route/always_route_filtering_strategy_test.go @@ -6,23 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_getFilterTypeName(t *testing.T) { - Assert := assert.New(t) - - Assert.Equal( - NodeFilterType("AlwaysPass"), - getFilterTypeName(AlwaysPass{}), - ) - Assert.Equal( - NodeFilterType("AlwaysFail"), - getFilterTypeName(AlwaysFail{}), - ) - Assert.Equal( - NodeFilterType("AndFilter"), - getFilterTypeName(&AndFilter{}), - ) -} - func Test_RemoveFilters_RemoveNone(t *testing.T) { Assert := assert.New(t) diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index f78d1db..01d1bd5 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -1,6 +1,9 @@ package route import ( + "reflect" + "strings" + "github.com/satsuma-data/node-gateway/internal/checks" "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" @@ -338,3 +341,23 @@ const ( MaxHeightForGroup NodeFilterType = "maxHeightForGroup" MethodsAllowed NodeFilterType = "methodsAllowed" ) + +func getFilterTypeName(v interface{}) NodeFilterType { + t := reflect.TypeOf(v) + + // If it's a pointer, get the element type. + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + // Extract the name of the type and remove the package path. + typeName := t.String() + lastDotIndex := strings.LastIndex(typeName, ".") + + if lastDotIndex != -1 { + // Remove the package path, keep only the type name. + typeName = typeName[lastDotIndex+1:] + } + + return NodeFilterType(typeName) +} diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 1743113..38bb983 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -315,3 +315,20 @@ func emitBlockHeight(store *metadata.ChainMetadataStore, groupID, upstreamID str func emitError(store *metadata.ChainMetadataStore, groupID, upstreamID string, err error) { store.ProcessErrorUpdate(groupID, upstreamID, err) } + +func Test_getFilterTypeName(t *testing.T) { + Assert := assert.New(t) + + Assert.Equal( + NodeFilterType("AlwaysPass"), + getFilterTypeName(AlwaysPass{}), + ) + Assert.Equal( + NodeFilterType("AlwaysFail"), + getFilterTypeName(AlwaysFail{}), + ) + Assert.Equal( + NodeFilterType("AndFilter"), + getFilterTypeName(&AndFilter{}), + ) +} From 84b73da56acaf753df2aa7243af8199262907e97 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Fri, 4 Oct 2024 19:20:41 -0700 Subject: [PATCH 20/21] refactor: deduplicate filter upstreams code --- .../route/always_route_filtering_strategy.go | 52 +++---------------- internal/route/filtering_strategy.go | 22 +++++++- 2 files changed, 26 insertions(+), 48 deletions(-) diff --git a/internal/route/always_route_filtering_strategy.go b/internal/route/always_route_filtering_strategy.go index 5d3a90b..1c9a01f 100644 --- a/internal/route/always_route_filtering_strategy.go +++ b/internal/route/always_route_filtering_strategy.go @@ -1,7 +1,6 @@ package route import ( - "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "go.uber.org/zap" @@ -25,7 +24,12 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( filters := s.NodeFilters // WARNING: Slice assignment, not a copy! for len(filters) > 0 { // Get all healthy upstreams for the current set of filters. - upstreams := s.FilterUpstreams(upstreamsByPriority, requestMetadata, filters) + upstreams := filterUpstreams( + upstreamsByPriority, + requestMetadata, + filters, + s.Logger, + ) // If there is at least one healthy upstream, route using the backing strategy. if len(upstreams) > 0 { @@ -52,50 +56,6 @@ func (s *AlwaysRouteFilteringStrategy) RouteNextRequest( return s.BackingStrategy.RouteNextRequest(upstreamsByPriority, requestMetadata) } -// FilterUpstreams filters upstreams based on the provided NodeFilter. -// WARNING: If a given priority does not have any healthy upstreams, -// it will not be included in the returned map. -func (s *AlwaysRouteFilteringStrategy) FilterUpstreams( - upstreamsByPriority types.PriorityToUpstreamsMap, - requestMetadata metadata.RequestMetadata, - nodeFilters []NodeFilter, -) types.PriorityToUpstreamsMap { - priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap) - - // Iterate over each priority and filter the associated upstreams. - for priority, upstreamConfigs := range upstreamsByPriority { - s.Logger.Debug( - "Determining healthy upstreams at priority.", - zap.Int("priority", priority), - zap.Any("upstreams", upstreamConfigs), - ) - - filteredUpstreams := make([]*config.UpstreamConfig, 0) - - // Iterate over each upstream and apply the filters. - for _, upstreamConfig := range upstreamConfigs { - pass := true - for _, nodeFilter := range nodeFilters { - pass = nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) - if !pass { - break - } - } - - if pass { - filteredUpstreams = append(filteredUpstreams, upstreamConfig) - } - } - - // Only add the priority to the map if there is at least one healthy upstream. - if len(filteredUpstreams) > 0 { - priorityToHealthyUpstreams[priority] = filteredUpstreams - } - } - - return priorityToHealthyUpstreams -} - // removeFilters returns a new slice of filters with the given filter type removed. func removeFilters(filters []NodeFilter, filterToRemove NodeFilterType) []NodeFilter { retFilters := make([]NodeFilter, 0) diff --git a/internal/route/filtering_strategy.go b/internal/route/filtering_strategy.go index 0c33b51..a365264 100644 --- a/internal/route/filtering_strategy.go +++ b/internal/route/filtering_strategy.go @@ -24,16 +24,34 @@ func (s *FilteringRoutingStrategy) RouteNextRequest( func (s *FilteringRoutingStrategy) filter( upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata, +) types.PriorityToUpstreamsMap { + return filterUpstreams( + upstreamsByPriority, + requestMetadata, + []NodeFilter{s.NodeFilter}, + s.Logger, + ) +} + +func filterUpstreams( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, + nodeFilters []NodeFilter, + logger *zap.Logger, ) types.PriorityToUpstreamsMap { priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap) + nodeFilter := AndFilter{ + filters: nodeFilters, + logger: logger, + } for priority, upstreamConfigs := range upstreamsByPriority { - s.Logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs)) + logger.Debug("Determining healthy upstreams at priority.", zap.Int("priority", priority), zap.Any("upstreams", upstreamConfigs)) filteredUpstreams := make([]*config.UpstreamConfig, 0) for _, upstreamConfig := range upstreamConfigs { - ok := s.NodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) + ok := nodeFilter.Apply(requestMetadata, upstreamConfig, len(upstreamConfigs)) if ok { filteredUpstreams = append(filteredUpstreams, upstreamConfig) } From a0442dd1e9253d302cdd62b557aa135931aa2b94 Mon Sep 17 00:00:00 2001 From: Peter Olsar Date: Fri, 4 Oct 2024 20:03:26 -0700 Subject: [PATCH 21/21] refactor: remove passive error/latency checker code --- internal/checks/checks_test.go | 5 +- internal/checks/errorlatency.go | 178 +++---------------------- internal/checks/errorlatency_test.go | 55 -------- internal/checks/manager.go | 7 - internal/config/config.go | 47 ++----- internal/mocks/ErrorLatencyChecker.go | 32 ----- internal/server/web_server_e2e_test.go | 15 ++- internal/types/types.go | 1 - 8 files changed, 44 insertions(+), 296 deletions(-) diff --git a/internal/checks/checks_test.go b/internal/checks/checks_test.go index dd501d2..3bd1a86 100644 --- a/internal/checks/checks_test.go +++ b/internal/checks/checks_test.go @@ -16,9 +16,8 @@ var defaultUpstreamConfig = &config.UpstreamConfig{ } var defaultRoutingConfig = &config.RoutingConfig{ - PassiveLatencyChecking: true, - DetectionWindow: config.NewDuration(10 * time.Minute), - BanWindow: config.NewDuration(50 * time.Minute), + DetectionWindow: config.NewDuration(10 * time.Minute), + BanWindow: config.NewDuration(50 * time.Minute), Errors: &config.ErrorsConfig{ Rate: 0.25, HTTPCodes: []string{ diff --git a/internal/checks/errorlatency.go b/internal/checks/errorlatency.go index f217476..ab50c95 100644 --- a/internal/checks/errorlatency.go +++ b/internal/checks/errorlatency.go @@ -1,7 +1,6 @@ package checks import ( - "context" "math" "net/http" "strconv" @@ -134,17 +133,16 @@ func NewCircuitBreaker( // Latency checking is disabled if `methodLatencyBreaker` is nil. // At least one of these two must be non-nil. type ErrorLatencyCheck struct { - client client.EthClient - Err error - clientGetter client.EthClientGetter - metricsContainer *metrics.Container - logger *zap.Logger - upstreamConfig *conf.UpstreamConfig - routingConfig *conf.RoutingConfig - errorCircuitBreaker ErrorCircuitBreaker - methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker - lock sync.RWMutex - ShouldRunPassiveHealthChecks bool + client client.EthClient + Err error + clientGetter client.EthClientGetter + metricsContainer *metrics.Container + logger *zap.Logger + upstreamConfig *conf.UpstreamConfig + routingConfig *conf.RoutingConfig + errorCircuitBreaker ErrorCircuitBreaker + methodLatencyBreaker map[string]LatencyCircuitBreaker // RPC method -> LatencyCircuitBreaker + lock sync.RWMutex } func NewErrorChecker( @@ -208,108 +206,14 @@ func NewErrorLatencyChecker( latencyCircuitBreaker = make(map[string]LatencyCircuitBreaker) } - c := &ErrorLatencyCheck{ - upstreamConfig: upstreamConfig, - routingConfig: routingConfig, - clientGetter: clientGetter, - metricsContainer: metricsContainer, - logger: logger, - errorCircuitBreaker: errorCircuitBreaker, - methodLatencyBreaker: latencyCircuitBreaker, - ShouldRunPassiveHealthChecks: routingConfig.PassiveLatencyChecking && (routingConfig.Errors != nil || routingConfig.Latency != nil), - } - - if c.ShouldRunPassiveHealthChecks && !(enableErrorChecking && enableLatencyChecking) { - panic("ErrorLatencyCheck must have both error and latency checking enabled for passive health checks.") - } - - if err := c.InitializePassiveCheck(); err != nil { - logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig), zap.Error(err)) - } - - return c -} - -func (c *ErrorLatencyCheck) InitializePassiveCheck() error { - if !c.ShouldRunPassiveHealthChecks { - return nil - } - - c.logger.Debug("Initializing ErrorLatencyCheck.", zap.Any("config", c.upstreamConfig)) - - httpClient, err := c.clientGetter(c.upstreamConfig.HTTPURL, &c.upstreamConfig.BasicAuthConfig, &c.upstreamConfig.RequestHeadersConfig) - if err != nil { - c.Err = err - return c.Err - } - - c.client = httpClient - - c.runPassiveCheck() - - // TODO(polsar): This check is in both PeerCheck and SyncingCheck implementations, so refactor this. - if isMethodNotSupportedErr(c.Err) { - c.logger.Debug("ErrorLatencyCheck is not supported by upstream, not running check.", zap.String("upstreamID", c.upstreamConfig.ID)) - - // Turn off the passive health check for this upstream. - c.ShouldRunPassiveHealthChecks = false - } - - return nil -} - -func (c *ErrorLatencyCheck) RunPassiveCheck() { - if !c.ShouldRunPassiveHealthChecks { - return - } - - if c.client == nil { - if err := c.InitializePassiveCheck(); err != nil { - c.logger.Error("Error initializing ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Error(err)) - c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPInit, - conf.PassiveLatencyCheckMethod, - ).Inc() - } - } - - c.runPassiveCheck() -} - -func (c *ErrorLatencyCheck) runPassiveCheck() { - if c.client == nil || !c.routingConfig.PassiveLatencyChecking { - return - } - - latencyConfig := c.routingConfig.Latency - if latencyConfig == nil { - return - } - - var wg sync.WaitGroup - defer wg.Wait() - - // Iterate over all (method, latencyThreshold) pairs and launch the check for each in parallel. - // Note that `latencyConfig.MethodLatencyThresholds` is never modified after its initialization - // in `config` package, so we don't need a lock to protect concurrent read access. - for method, latencyThreshold := range latencyConfig.MethodLatencyThresholds { - wg.Add(1) - - // Passing the loop variables as arguments is required to prevent the following lint error: - // loopclosure: loop variable method captured by func literal (govet) - go func(method string, latencyThreshold time.Duration) { - defer wg.Done() - - runCheck := func() { - c.runPassiveCheckForMethod(method, latencyThreshold) - } - - runCheckWithMetrics(runCheck, - c.metricsContainer.ErrorLatencyCheckRequests.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod), - c.metricsContainer.ErrorLatencyCheckDuration.WithLabelValues(c.upstreamConfig.ID, c.upstreamConfig.HTTPURL, conf.PassiveLatencyCheckMethod)) - }(method, latencyThreshold) + return &ErrorLatencyCheck{ + upstreamConfig: upstreamConfig, + routingConfig: routingConfig, + clientGetter: clientGetter, + metricsContainer: metricsContainer, + logger: logger, + errorCircuitBreaker: errorCircuitBreaker, + methodLatencyBreaker: latencyCircuitBreaker, } } @@ -330,48 +234,6 @@ func (c *ErrorLatencyCheck) getLatencyCircuitBreaker(method string) LatencyCircu return stats } -// This method runs the passive latency check for the specified method and latency threshold. -func (c *ErrorLatencyCheck) runPassiveCheckForMethod(method string, latencyThreshold time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), RPCRequestTimeout) - defer cancel() - - latencyBreaker := c.getLatencyCircuitBreaker(method) - - // Make and record the request. - var duration time.Duration - duration, c.Err = c.client.RecordLatency(ctx, method) - // TODO(polsar): The error must also pass the checks specified in the config - // (i.e. match HTTP code, JSON RPC code, and error message). - // Fixing this is not a priority since we're not currently using passive health checking. - isError := c.Err != nil - c.errorCircuitBreaker.RecordResponse(isError) - latencyBreaker.RecordLatency(duration) - - if isError { - c.metricsContainer.ErrorLatencyCheckErrors.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - method, - ).Inc() - } else if duration >= latencyThreshold { - c.metricsContainer.ErrorLatencyCheckHighLatencies.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - metrics.HTTPRequest, - method, - ).Inc() - } - - c.metricsContainer.ErrorLatency.WithLabelValues( - c.upstreamConfig.ID, - c.upstreamConfig.HTTPURL, - method, - ).Set(float64(duration.Milliseconds())) - - c.logger.Debug("Ran passive ErrorLatencyCheck.", zap.Any("upstreamID", c.upstreamConfig.ID), zap.Any("latency", duration), zap.Error(c.Err)) -} - // IsPassing // TODO(polsar): Split this method into two separate methods: IsPassingError and IsPassingLatency. func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { @@ -421,10 +283,6 @@ func (c *ErrorLatencyCheck) IsPassing(methods []string) bool { // RecordRequest // TODO(polsar): Split this method into two separate methods: RecordError and RecordLatency. func (c *ErrorLatencyCheck) RecordRequest(data *types.RequestData) { - if c.routingConfig.PassiveLatencyChecking { - return - } - if !c.routingConfig.IsEnhancedRoutingControlDefined() { return } diff --git a/internal/checks/errorlatency_test.go b/internal/checks/errorlatency_test.go index f9e8754..de89ee5 100644 --- a/internal/checks/errorlatency_test.go +++ b/internal/checks/errorlatency_test.go @@ -2,65 +2,10 @@ package checks import ( "testing" - "time" - "github.com/satsuma-data/node-gateway/internal/client" - "github.com/satsuma-data/node-gateway/internal/config" - "github.com/satsuma-data/node-gateway/internal/metrics" - "github.com/satsuma-data/node-gateway/internal/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "go.uber.org/zap" ) -func helperTestLatencyChecker(t *testing.T, latency1, latency2 time.Duration, pass bool) { - t.Helper() - - methods := []string{"eth_call", "eth_getLogs"} - - ethClient := mocks.NewEthClient(t) - ethClient.EXPECT().RecordLatency(mock.Anything, methods[0]).Return(latency1, nil) - ethClient.EXPECT().RecordLatency(mock.Anything, methods[1]).Return(latency2, nil) - - mockEthClientGetter := func(url string, credentials *config.BasicAuthConfig, additionalRequestHeaders *[]config.RequestHeaderConfig) (client.EthClient, error) { //nolint:nolintlint,revive // Legacy - return ethClient, nil - } - - checker := NewErrorLatencyChecker( - defaultUpstreamConfig, - defaultRoutingConfig, - mockEthClientGetter, - metrics.NewContainer(config.TestChainName), - zap.L(), - true, - true, - ) - - assert.Equal(t, pass, checker.IsPassing(methods)) - - ethClient.AssertNumberOfCalls(t, "RecordLatency", 2) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesLessThanThreshold(t *testing.T) { - helperTestLatencyChecker(t, 2*time.Millisecond, 3*time.Millisecond, true) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesJustBelowThreshold(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, (2000-1)*time.Millisecond, true) -} - -func TestLatencyChecker_TwoMethods_FirstLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10000*time.Millisecond, (2000-1)*time.Millisecond, false) -} - -func TestLatencyChecker_TwoMethods_SecondLatencyTooHigh(t *testing.T) { - helperTestLatencyChecker(t, (10000-1)*time.Millisecond, 2000*time.Millisecond, false) -} - -func TestLatencyChecker_TwoMethods_BothLatenciesTooHigh(t *testing.T) { - helperTestLatencyChecker(t, 10002*time.Millisecond, 2003*time.Millisecond, false) -} - func Test_isMatchForPatterns_True(t *testing.T) { Assert := assert.New(t) diff --git a/internal/checks/manager.go b/internal/checks/manager.go index 2c06e4f..18ce65f 100644 --- a/internal/checks/manager.go +++ b/internal/checks/manager.go @@ -291,13 +291,6 @@ func (h *healthCheckManager) runChecksOnce() { defer wg.Done() c.RunCheck() }(h.GetUpstreamStatus(config.ID).SyncingCheck) - - wg.Add(1) - - go func(c types.ErrorLatencyChecker) { - defer wg.Done() - c.RunPassiveCheck() - }(h.GetUpstreamStatus(config.ID).LatencyCheck) } wg.Wait() diff --git a/internal/config/config.go b/internal/config/config.go index 98d351a..d57aeb8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,23 +14,13 @@ import ( type NodeType string const ( - DefaultBanWindow = 5 * time.Minute - DefaultDetectionWindow = time.Minute - // DefaultMaxLatency is used when the latency threshold is not specified in the config. - // TODO(polsar): We should probably use a lower value. - DefaultMaxLatency = 10 * time.Second + DefaultBanWindow = 5 * time.Minute + DefaultDetectionWindow = time.Minute + DefaultMaxLatency = 10 * time.Second // Default latency threshold DefaultErrorRate = 0.25 DefaultLatencyTooHighRate = 0.5 // TODO(polsar): Expose this parameter in the config. Archive NodeType = "archive" Full NodeType = "full" - // PassiveLatencyCheckMethod is a dummy method we use to measure the latency of an upstream RPC endpoint. - // https://docs.infura.io/api/networks/ethereum/json-rpc-methods/eth_chainid - PassiveLatencyCheckMethod = "eth_chainId" //nolint:gosec // No hardcoded credentials here. - // PassiveLatencyChecking indicates whether to use live (active) requests as data for the LatencyChecker (false) - // or synthetic (passive) periodic requests (true). - // TODO(polsar): This setting is currently not configurable via the YAML config file. - // TODO(polsar): We may also consider a hybrid request latency/error checking using both active and passive requests. - PassiveLatencyChecking = false ) type UpstreamConfig struct { @@ -249,12 +239,7 @@ type MethodConfig struct { Threshold time.Duration `yaml:"threshold"` } -func (c *MethodConfig) isMethodConfigValid(passiveLatencyChecking bool) bool { - if passiveLatencyChecking { - // TODO(polsar): Validate the method name: https://ethereum.org/en/developers/docs/apis/json-rpc/ - return !strings.EqualFold(c.Name, PassiveLatencyCheckMethod) - } - +func (c *MethodConfig) isMethodConfigValid() bool { return true } @@ -345,13 +330,13 @@ func (c *LatencyConfig) initialize(globalConfig *RoutingConfig) { } } -func (c *LatencyConfig) isLatencyConfigValid(passiveLatencyChecking bool) bool { +func (c *LatencyConfig) isLatencyConfigValid() bool { if c == nil { return true } for _, method := range c.Methods { - if !method.isMethodConfigValid(passiveLatencyChecking) { + if !method.isMethodConfigValid() { return false } } @@ -360,14 +345,13 @@ func (c *LatencyConfig) isLatencyConfigValid(passiveLatencyChecking bool) bool { } type RoutingConfig struct { - AlwaysRoute *bool `yaml:"alwaysRoute"` - Errors *ErrorsConfig `yaml:"errors"` - Latency *LatencyConfig `yaml:"latency"` - DetectionWindow *time.Duration `yaml:"detectionWindow"` - BanWindow *time.Duration `yaml:"banWindow"` - MaxBlocksBehind int `yaml:"maxBlocksBehind"` - PassiveLatencyChecking bool - IsInitialized bool + AlwaysRoute *bool `yaml:"alwaysRoute"` + Errors *ErrorsConfig `yaml:"errors"` + Latency *LatencyConfig `yaml:"latency"` + DetectionWindow *time.Duration `yaml:"detectionWindow"` + BanWindow *time.Duration `yaml:"banWindow"` + MaxBlocksBehind int `yaml:"maxBlocksBehind"` + IsInitialized bool } // IsEnhancedRoutingControlDefined returns true iff any of the enhanced routing control fields are specified @@ -387,8 +371,6 @@ func (r *RoutingConfig) setDefaults(globalConfig *RoutingConfig, force bool) boo return true } - r.PassiveLatencyChecking = PassiveLatencyChecking - if !force && !r.IsEnhancedRoutingControlDefined() && (globalConfig == nil || !globalConfig.IsEnhancedRoutingControlDefined()) { // Routing config is not specified at either this or global level, so there is nothing to do. return false @@ -462,7 +444,7 @@ func (r *RoutingConfig) isRoutingConfigValid() bool { latency := r.Latency if latency != nil { - isValid = isValid && latency.isLatencyConfigValid(r.PassiveLatencyChecking) + isValid = isValid && latency.isLatencyConfigValid() } return isValid @@ -531,7 +513,6 @@ func (c *SingleChainConfig) isValid() bool { func (c *SingleChainConfig) setDefaults(globalConfig *GlobalConfig, isGlobalRoutingConfigSpecified bool) { if !isGlobalRoutingConfigSpecified && !c.Routing.IsEnhancedRoutingControlDefined() { - c.Routing.PassiveLatencyChecking = PassiveLatencyChecking return } diff --git a/internal/mocks/ErrorLatencyChecker.go b/internal/mocks/ErrorLatencyChecker.go index 46df736..dcd8dfc 100644 --- a/internal/mocks/ErrorLatencyChecker.go +++ b/internal/mocks/ErrorLatencyChecker.go @@ -99,38 +99,6 @@ func (_c *ErrorLatencyChecker_RecordRequest_Call) RunAndReturn(run func(*types.R return _c } -// RunPassiveCheck provides a mock function with given fields: -func (_m *ErrorLatencyChecker) RunPassiveCheck() { - _m.Called() -} - -// ErrorLatencyChecker_RunPassiveCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunPassiveCheck' -type ErrorLatencyChecker_RunPassiveCheck_Call struct { - *mock.Call -} - -// RunPassiveCheck is a helper method to define mock.On call -func (_e *ErrorLatencyChecker_Expecter) RunPassiveCheck() *ErrorLatencyChecker_RunPassiveCheck_Call { - return &ErrorLatencyChecker_RunPassiveCheck_Call{Call: _e.mock.On("RunPassiveCheck")} -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) Run(run func()) *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) Return() *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Return() - return _c -} - -func (_c *ErrorLatencyChecker_RunPassiveCheck_Call) RunAndReturn(run func()) *ErrorLatencyChecker_RunPassiveCheck_Call { - _c.Call.Return(run) - return _c -} - // NewErrorLatencyChecker creates a new instance of ErrorLatencyChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewErrorLatencyChecker(t interface { diff --git a/internal/server/web_server_e2e_test.go b/internal/server/web_server_e2e_test.go index 1c28ce4..5169b10 100644 --- a/internal/server/web_server_e2e_test.go +++ b/internal/server/web_server_e2e_test.go @@ -332,10 +332,15 @@ func executeRequest( handler.ServeHTTP(recorder, req) - result := recorder.Result() + result := recorder.Result() //nolint:bodyclose // Body is closed in the defer statement below. resultBody, _ := io.ReadAll(result.Body) - defer result.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { //nolint:errorlint // Wrong error. + t.Errorf("Error closing response body: %s", err) + } + }(result.Body) responseBody, err := jsonrpc.DecodeResponseBody(resultBody) assert.NoError(t, err) @@ -414,7 +419,7 @@ func handleSingleRequest(t *testing.T, request jsonrpc.SingleRequestBody, case "net_peerCount": return jsonrpc.SingleResponseBody{Result: getResultFromString(hexutil.Uint64(10).String())} - case config.PassiveLatencyCheckMethod: + case "eth_chainId": return jsonrpc.SingleResponseBody{Result: getResultFromString(hexutil.Uint64(11).String())} case "eth_getBlockByNumber": @@ -424,7 +429,7 @@ func handleSingleRequest(t *testing.T, request jsonrpc.SingleRequestBody, }) return jsonrpc.SingleResponseBody{ - Result: json.RawMessage(result), + Result: result, } case "eth_blockNumber": @@ -454,7 +459,7 @@ func setUpUnhealthyUpstream(t *testing.T) *httptest.Server { switch r := requestBody.(type) { case *jsonrpc.SingleRequestBody: switch requestBody.GetMethod() { - case "eth_syncing", "net_peerCount", config.PassiveLatencyCheckMethod, "eth_getBlockByNumber": + case "eth_syncing", "net_peerCount", "eth_chainId", "eth_getBlockByNumber": responseBody = &jsonrpc.SingleResponseBody{Error: &jsonrpc.Error{Message: "This is a failing fake node!"}} writeResponseBody(t, writer, responseBody) default: diff --git a/internal/types/types.go b/internal/types/types.go index 0c52397..a08c2a9 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -41,7 +41,6 @@ type Checker interface { //go:generate mockery --output ../mocks --name ErrorLatencyChecker --with-expecter type ErrorLatencyChecker interface { - RunPassiveCheck() IsPassing(methods []string) bool RecordRequest(data *RequestData) }