From 15bb16d1f2f7250986036c435cdd98ce870e9ed4 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Mon, 18 Dec 2023 18:14:20 -0800 Subject: [PATCH] fix: node gateway visbility improvements --- internal/jsonrpc/jsonrpc.go | 6 +- internal/jsonrpc/jsonrpc_test.go | 13 +++- internal/metrics/metrics.go | 2 +- internal/mocks/BlockHeightChecker.go | 23 ++++-- internal/mocks/Checker.go | 15 ++-- internal/mocks/EthClient.go | 27 +++++-- internal/mocks/HTTPClient.go | 15 ++-- internal/mocks/HealthCheckManager.go | 19 +++-- internal/mocks/Router.go | 47 ++++++------ internal/mocks/RoutingStrategy.go | 15 ++-- internal/route/request_executor.go | 97 +++++++++++++++---------- internal/route/request_executor_test.go | 19 ++--- internal/route/router.go | 36 +++------ internal/route/router_test.go | 20 ++--- internal/route/routing_strategy.go | 14 +++- internal/route/routing_strategy_test.go | 2 +- internal/server/rpc_handler.go | 21 ++++-- internal/server/web_server_test.go | 16 +--- internal/util/http.go | 18 +++++ 19 files changed, 244 insertions(+), 181 deletions(-) create mode 100644 internal/util/http.go diff --git a/internal/jsonrpc/jsonrpc.go b/internal/jsonrpc/jsonrpc.go index d0ce18a2..24ab51d2 100644 --- a/internal/jsonrpc/jsonrpc.go +++ b/internal/jsonrpc/jsonrpc.go @@ -102,14 +102,14 @@ type DecodeError struct { Content []byte // Content that couldn't be decoded. } -func NewDecodeError(err error, content []byte) DecodeError { - return DecodeError{ +func NewDecodeError(err error, content []byte) *DecodeError { + return &DecodeError{ Err: err, Content: content, } } -func (e DecodeError) Error() string { +func (e *DecodeError) Error() string { return fmt.Sprintf("decode error: %s, content: %s", e.Err.Error(), string(e.Content)) } diff --git a/internal/jsonrpc/jsonrpc_test.go b/internal/jsonrpc/jsonrpc_test.go index 5074bc93..2aeeca2e 100644 --- a/internal/jsonrpc/jsonrpc_test.go +++ b/internal/jsonrpc/jsonrpc_test.go @@ -143,6 +143,11 @@ func TestEncodeAndDecodeResponses(t *testing.T) { }, }, }, + { + testName: "empty response in batch", + body: ``, + expectedResponse: nil, + }, { testName: "batch responses", body: "[" + @@ -180,9 +185,11 @@ func TestEncodeAndDecodeResponses(t *testing.T) { assert.Nil(t, err) assert.Equal(t, tc.expectedResponse, decoded) - encoded, err := decoded.Encode() + if decoded != nil { + encoded, err := decoded.Encode() - assert.Nil(t, err) - assert.Equal(t, tc.body, string(encoded)) + assert.Nil(t, err) + assert.Equal(t, tc.body, string(encoded)) + } } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 6a5dfdef..1beb8869 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -94,7 +94,7 @@ var ( "Batches are deconstructed to single JSON RPC requests for this metric.", }, // jsonrpc_method is "batch" for batch requests - []string{"chain_name", "client", "upstream_id", "url", "jsonrpc_method", "response_code", "jsonrpc_error_code"}, + []string{"chain_name", "client", "upstream_id", "url", "jsonrpc_method", "jsonrpc_error_code"}, ) upstreamRPCDuration = promauto.NewHistogramVec( diff --git a/internal/mocks/BlockHeightChecker.go b/internal/mocks/BlockHeightChecker.go index c2761db3..a2765cf0 100644 --- a/internal/mocks/BlockHeightChecker.go +++ b/internal/mocks/BlockHeightChecker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -21,6 +21,10 @@ func (_m *BlockHeightChecker) EXPECT() *BlockHeightChecker_Expecter { func (_m *BlockHeightChecker) GetBlockHeight() uint64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetBlockHeight") + } + var r0 uint64 if rf, ok := ret.Get(0).(func() uint64); ok { r0 = rf() @@ -62,6 +66,10 @@ func (_c *BlockHeightChecker_GetBlockHeight_Call) RunAndReturn(run func() uint64 func (_m *BlockHeightChecker) GetError() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetError") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -103,6 +111,10 @@ func (_c *BlockHeightChecker_GetError_Call) RunAndReturn(run func() error) *Bloc func (_m *BlockHeightChecker) IsPassing(maxBlockHeight uint64) bool { ret := _m.Called(maxBlockHeight) + if len(ret) == 0 { + panic("no return value specified for IsPassing") + } + var r0 bool if rf, ok := ret.Get(0).(func(uint64) bool); ok { r0 = rf(maxBlockHeight) @@ -173,13 +185,12 @@ func (_c *BlockHeightChecker_RunCheck_Call) RunAndReturn(run func()) *BlockHeigh return _c } -type mockConstructorTestingTNewBlockHeightChecker interface { +// NewBlockHeightChecker creates a new instance of BlockHeightChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBlockHeightChecker(t interface { mock.TestingT Cleanup(func()) -} - -// NewBlockHeightChecker creates a new instance of BlockHeightChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBlockHeightChecker(t mockConstructorTestingTNewBlockHeightChecker) *BlockHeightChecker { +}) *BlockHeightChecker { mock := &BlockHeightChecker{} mock.Mock.Test(t) diff --git a/internal/mocks/Checker.go b/internal/mocks/Checker.go index 5b243084..79f642c6 100644 --- a/internal/mocks/Checker.go +++ b/internal/mocks/Checker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -21,6 +21,10 @@ func (_m *Checker) EXPECT() *Checker_Expecter { func (_m *Checker) IsPassing() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsPassing") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -90,13 +94,12 @@ func (_c *Checker_RunCheck_Call) RunAndReturn(run func()) *Checker_RunCheck_Call return _c } -type mockConstructorTestingTNewChecker interface { +// NewChecker creates a new instance of Checker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewChecker(t interface { mock.TestingT Cleanup(func()) -} - -// NewChecker creates a new instance of Checker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewChecker(t mockConstructorTestingTNewChecker) *Checker { +}) *Checker { mock := &Checker{} mock.Mock.Test(t) diff --git a/internal/mocks/EthClient.go b/internal/mocks/EthClient.go index 4e8a299f..b6dfb96d 100644 --- a/internal/mocks/EthClient.go +++ b/internal/mocks/EthClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -31,6 +31,10 @@ func (_m *EthClient) EXPECT() *EthClient_Expecter { func (_m *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { @@ -86,6 +90,10 @@ func (_c *EthClient_HeaderByNumber_Call) RunAndReturn(run func(context.Context, func (_m *EthClient) PeerCount(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for PeerCount") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { @@ -138,6 +146,10 @@ func (_c *EthClient_PeerCount_Call) RunAndReturn(run func(context.Context) (uint func (_m *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { ret := _m.Called(ctx, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { @@ -193,6 +205,10 @@ func (_c *EthClient_SubscribeNewHead_Call) RunAndReturn(run func(context.Context func (_m *EthClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SyncProgress") + } + var r0 *ethereum.SyncProgress var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*ethereum.SyncProgress, error)); ok { @@ -243,13 +259,12 @@ func (_c *EthClient_SyncProgress_Call) RunAndReturn(run func(context.Context) (* return _c } -type mockConstructorTestingTNewEthClient interface { +// NewEthClient creates a new instance of EthClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEthClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewEthClient creates a new instance of EthClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEthClient(t mockConstructorTestingTNewEthClient) *EthClient { +}) *EthClient { mock := &EthClient{} mock.Mock.Test(t) diff --git a/internal/mocks/HTTPClient.go b/internal/mocks/HTTPClient.go index 581cc300..fa4f597d 100644 --- a/internal/mocks/HTTPClient.go +++ b/internal/mocks/HTTPClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -17,6 +17,10 @@ type HTTPClient struct { func (_m *HTTPClient) Do(req *http.Request) (*http.Response, error) { ret := _m.Called(req) + if len(ret) == 0 { + panic("no return value specified for Do") + } + var r0 *http.Response var r1 error if rf, ok := ret.Get(0).(func(*http.Request) (*http.Response, error)); ok { @@ -39,13 +43,12 @@ func (_m *HTTPClient) Do(req *http.Request) (*http.Response, error) { return r0, r1 } -type mockConstructorTestingTNewHTTPClient interface { +// NewHTTPClient creates a new instance of HTTPClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHTTPClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewHTTPClient creates a new instance of HTTPClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHTTPClient(t mockConstructorTestingTNewHTTPClient) *HTTPClient { +}) *HTTPClient { mock := &HTTPClient{} mock.Mock.Test(t) diff --git a/internal/mocks/HealthCheckManager.go b/internal/mocks/HealthCheckManager.go index fe99cc64..483a8fdd 100644 --- a/internal/mocks/HealthCheckManager.go +++ b/internal/mocks/HealthCheckManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -24,6 +24,10 @@ func (_m *HealthCheckManager) EXPECT() *HealthCheckManager_Expecter { func (_m *HealthCheckManager) GetUpstreamStatus(upstreamID string) *types.UpstreamStatus { ret := _m.Called(upstreamID) + if len(ret) == 0 { + panic("no return value specified for GetUpstreamStatus") + } + var r0 *types.UpstreamStatus if rf, ok := ret.Get(0).(func(string) *types.UpstreamStatus); ok { r0 = rf(upstreamID) @@ -68,6 +72,10 @@ func (_c *HealthCheckManager_GetUpstreamStatus_Call) RunAndReturn(run func(strin func (_m *HealthCheckManager) IsInitialized() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsInitialized") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -137,13 +145,12 @@ func (_c *HealthCheckManager_StartHealthChecks_Call) RunAndReturn(run func()) *H return _c } -type mockConstructorTestingTNewHealthCheckManager interface { +// NewHealthCheckManager creates a new instance of HealthCheckManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHealthCheckManager(t interface { mock.TestingT Cleanup(func()) -} - -// NewHealthCheckManager creates a new instance of HealthCheckManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHealthCheckManager(t mockConstructorTestingTNewHealthCheckManager) *HealthCheckManager { +}) *HealthCheckManager { mock := &HealthCheckManager{} mock.Mock.Test(t) diff --git a/internal/mocks/Router.go b/internal/mocks/Router.go index 7516fdf3..31e4897a 100644 --- a/internal/mocks/Router.go +++ b/internal/mocks/Router.go @@ -1,10 +1,9 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks import ( context "context" - http "net/http" jsonrpc "github.com/satsuma-data/node-gateway/internal/jsonrpc" mock "github.com/stretchr/testify/mock" @@ -27,6 +26,10 @@ func (_m *Router) EXPECT() *Router_Expecter { func (_m *Router) IsInitialized() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsInitialized") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -65,14 +68,17 @@ func (_c *Router_IsInitialized_Call) RunAndReturn(run func() bool) *Router_IsIni } // Route provides a mock function with given fields: ctx, requestBody -func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error) { +func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error) { ret := _m.Called(ctx, requestBody) + if len(ret) == 0 { + panic("no return value specified for Route") + } + var r0 string var r1 jsonrpc.ResponseBody - var r2 *http.Response - var r3 error - if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error)); ok { + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error)); ok { return rf(ctx, requestBody) } if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) string); ok { @@ -89,21 +95,13 @@ func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (s } } - if rf, ok := ret.Get(2).(func(context.Context, jsonrpc.RequestBody) *http.Response); ok { + if rf, ok := ret.Get(2).(func(context.Context, jsonrpc.RequestBody) error); ok { r2 = rf(ctx, requestBody) } else { - if ret.Get(2) != nil { - r2 = ret.Get(2).(*http.Response) - } - } - - if rf, ok := ret.Get(3).(func(context.Context, jsonrpc.RequestBody) error); ok { - r3 = rf(ctx, requestBody) - } else { - r3 = ret.Error(3) + r2 = ret.Error(2) } - return r0, r1, r2, r3 + return r0, r1, r2 } // Router_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route' @@ -125,12 +123,12 @@ func (_c *Router_Route_Call) Run(run func(ctx context.Context, requestBody jsonr return _c } -func (_c *Router_Route_Call) Return(_a0 string, _a1 jsonrpc.ResponseBody, _a2 *http.Response, _a3 error) *Router_Route_Call { - _c.Call.Return(_a0, _a1, _a2, _a3) +func (_c *Router_Route_Call) Return(_a0 string, _a1 jsonrpc.ResponseBody, _a2 error) *Router_Route_Call { + _c.Call.Return(_a0, _a1, _a2) return _c } -func (_c *Router_Route_Call) RunAndReturn(run func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error)) *Router_Route_Call { +func (_c *Router_Route_Call) RunAndReturn(run func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error)) *Router_Route_Call { _c.Call.Return(run) return _c } @@ -167,13 +165,12 @@ func (_c *Router_Start_Call) RunAndReturn(run func()) *Router_Start_Call { return _c } -type mockConstructorTestingTNewRouter interface { +// NewRouter creates a new instance of Router. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRouter(t interface { mock.TestingT Cleanup(func()) -} - -// NewRouter creates a new instance of Router. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewRouter(t mockConstructorTestingTNewRouter) *Router { +}) *Router { mock := &Router{} mock.Mock.Test(t) diff --git a/internal/mocks/RoutingStrategy.go b/internal/mocks/RoutingStrategy.go index ca4824d7..6f107b43 100644 --- a/internal/mocks/RoutingStrategy.go +++ b/internal/mocks/RoutingStrategy.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -26,6 +26,10 @@ func (_m *MockRoutingStrategy) EXPECT() *MockRoutingStrategy_Expecter { func (_m *MockRoutingStrategy) RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata) (string, error) { ret := _m.Called(upstreamsByPriority, requestMetadata) + if len(ret) == 0 { + panic("no return value specified for RouteNextRequest") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(types.PriorityToUpstreamsMap, metadata.RequestMetadata) (string, error)); ok { @@ -75,13 +79,12 @@ func (_c *MockRoutingStrategy_RouteNextRequest_Call) RunAndReturn(run func(types return _c } -type mockConstructorTestingTNewMockRoutingStrategy interface { +// NewMockRoutingStrategy creates a new instance of MockRoutingStrategy. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRoutingStrategy(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockRoutingStrategy creates a new instance of MockRoutingStrategy. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRoutingStrategy(t mockConstructorTestingTNewMockRoutingStrategy) *MockRoutingStrategy { +}) *MockRoutingStrategy { mock := &MockRoutingStrategy{} mock.Mock.Test(t) diff --git a/internal/route/request_executor.go b/internal/route/request_executor.go index 77bf220e..efaaf257 100644 --- a/internal/route/request_executor.go +++ b/internal/route/request_executor.go @@ -14,6 +14,7 @@ import ( "github.com/satsuma-data/node-gateway/internal/client" "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/jsonrpc" + "github.com/satsuma-data/node-gateway/internal/util" "go.uber.org/zap" ) @@ -34,18 +35,31 @@ func (e *HandledError) Error() string { } type OriginError struct { - err error + err error + response string + ResponseCode int +} + +type HTTPResponse struct { + StatusCode int } func (e *OriginError) Error() string { return fmt.Sprintf("error making request to origin: %v", e.err) } +/* + * Return arguments + * 1. JSON Response body - Body of response decoded into JSON RPC, if possible. + * 2. HTTP Response - HTTP response from upstream, if possible. + * 3. Cached - Whether the response was cached. + * 4. Error - Error encountered when making request to upstream or another error encountered during processing. + */ func (r *RequestExecutor) routeToConfig( ctx context.Context, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig, -) (jsonrpc.ResponseBody, *http.Response, bool, error) { +) (jsonrpc.ResponseBody, *HTTPResponse, bool, error) { bodyBytes, err := requestBody.Encode() if err != nil { r.logger.Error("Could not serialize request.", zap.Any("request", requestBody), zap.Error(err)) @@ -74,7 +88,7 @@ func (r *RequestExecutor) routeToConfig( var ( respBody jsonrpc.ResponseBody - resp *http.Response + resp *HTTPResponse ) singleRequestBody, isSingleRequestBody := requestBody.(*jsonrpc.SingleRequestBody) @@ -85,27 +99,24 @@ func (r *RequestExecutor) routeToConfig( // We must clone the httpReq otherwise the body will already be closed on the second request. respBody, resp, cached, err = r.retrieveOrCacheRequest(cloneRequest(httpReq), *singleRequestBody, configToRoute) if err != nil { - originError, _ := err.(*OriginError) - // An OriginError indicates a cache miss and request failure to origin. - // We want this error to bubble up. - // Unknown cache errors will default back to the "non-caching" behavior. - if originError != nil { + switch e := err.(type) { + case *OriginError, *jsonrpc.DecodeError: + // These errors indicates a cache miss and request failure to origin. + // We want this error to bubble up. + // Unknown cache errors will default back to the "non-caching" behavior. r.logger.Warn("caching error making request to origin", zap.Error(err), zap.Any("request", requestBody)) - return nil, nil, cached, originError + return nil, nil, cached, e + default: + r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody)) } - - r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody)) } else { return respBody, resp, cached, nil } } respBody, resp, err = r.getResponseBody(httpReq, requestBody, configToRoute) - if err != nil { - return nil, nil, false, err - } - return respBody, resp, false, nil + return respBody, resp, false, err } func (r *RequestExecutor) useCache(requestBody jsonrpc.RequestBody) bool { @@ -116,22 +127,22 @@ func (r *RequestExecutor) useCache(requestBody jsonrpc.RequestBody) bool { return r.cache.ShouldCacheMethod(requestBody.GetMethod()) } -func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestBody jsonrpc.SingleRequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *http.Response, bool, error) { +func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestBody jsonrpc.SingleRequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *HTTPResponse, bool, error) { var ( - respBody jsonrpc.ResponseBody - resp *http.Response + jsonRPCRespBody jsonrpc.ResponseBody + httpResp *HTTPResponse ) originFunc := func() (*jsonrpc.SingleResponseBody, error) { var err error // Any errors will result in respBody and resp being nil. - respBody, resp, err = r.getResponseBody(httpReq, &requestBody, configToRoute) //nolint:bodyclose // linter bug + jsonRPCRespBody, httpResp, err = r.getResponseBody(httpReq, &requestBody, configToRoute) if err != nil { return nil, err } - singleRespBody, ok := respBody.(*jsonrpc.SingleResponseBody) + singleRespBody, ok := jsonRPCRespBody.(*jsonrpc.SingleResponseBody) if !ok { return nil, errors.New("batched responses do not support caching") } @@ -157,22 +168,21 @@ func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestB case *HandledError: // The cache uses request coalescing, an error may be returned by another goroutine. // Construct a responsebody and a fake response. - if resp == nil && respBody == nil { - resp = &http.Response{ + if httpResp == nil && jsonRPCRespBody == nil { + httpResp = &HTTPResponse{ StatusCode: http.StatusOK, - Body: io.NopCloser(new(bytes.Buffer)), } rb := *err.rb rb.ID = *requestBody.ID rb.JSONRPC = requestBody.JSONRPCVersion - respBody = &rb + jsonRPCRespBody = &rb } default: return nil, nil, cached, err } } - if resp == nil && respBody == nil { + if httpResp == nil && jsonRPCRespBody == nil { if val == nil { return nil, nil, cached, fmt.Errorf("unexpected empty response from cache") } @@ -180,41 +190,54 @@ func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestB r.logger.Debug("cache hit", zap.Any("request", requestBody), zap.Any("value", val)) // Fill in id and jsonrpc in the respBody to match the request. - resp = &http.Response{ + httpResp = &HTTPResponse{ StatusCode: http.StatusOK, - Body: io.NopCloser(new(bytes.Buffer)), } - respBody = &jsonrpc.SingleResponseBody{ + jsonRPCRespBody = &jsonrpc.SingleResponseBody{ ID: *requestBody.ID, JSONRPC: requestBody.JSONRPCVersion, Result: val, } } - return respBody, resp, cached, nil + return jsonRPCRespBody, httpResp, cached, nil } -func (r *RequestExecutor) getResponseBody(httpReq *http.Request, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *http.Response, error) { +func (r *RequestExecutor) getResponseBody(httpReq *http.Request, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *HTTPResponse, error) { resp, err := r.httpClient.Do(httpReq) if err != nil { r.logger.Error("Error encountered when executing request.", zap.Any("request", requestBody), - zap.String("upstreamID", configToRoute.ID), zap.String("response", fmt.Sprintf("%v", resp)), zap.Error(err)) - return nil, nil, &OriginError{err} + zap.String("upstreamID", configToRoute.ID), zap.Error(err)) + + return nil, nil, &OriginError{err, "", 0} } + + // Body can only be read once. Read it out and put it back in the response. + respBodyString := util.ReadAndCopyBackResponseBody(resp) + httpResp := &HTTPResponse{resp.StatusCode} + + if resp.StatusCode >= http.StatusBadRequest { + r.logger.Error("Non-2xx/3xx status code encountered when executing request.", zap.Any("request", requestBody), + zap.String("upstreamID", configToRoute.ID), zap.String("response", respBodyString), + zap.Int("httpStatusCode", resp.StatusCode), zap.Error(err)) + + return nil, httpResp, &OriginError{nil, respBodyString, resp.StatusCode} + } + defer resp.Body.Close() - respBody, err := jsonrpc.DecodeResponseBody(resp) + jsonRPCBody, err := jsonrpc.DecodeResponseBody(resp) if err != nil { r.logger.Warn("Could not deserialize response.", zap.Any("request", requestBody), - zap.String("upstreamID", configToRoute.ID), zap.String("response", fmt.Sprintf("%v", resp)), zap.Error(err)) - return nil, nil, &OriginError{err} + zap.String("upstreamID", configToRoute.ID), zap.String("response", respBodyString), zap.Error(err)) + return nil, httpResp, err } - r.logger.Debug("Successfully routed request to upstream.", zap.String("upstreamID", configToRoute.ID), zap.Any("request", requestBody), zap.Any("response", respBody)) + r.logger.Debug("Successfully routed request to upstream.", zap.String("upstreamID", configToRoute.ID), zap.Any("request", requestBody), zap.Any("response", jsonRPCBody)) - return respBody, resp, nil + return jsonRPCBody, httpResp, nil } func cloneRequest(r *http.Request) *http.Request { diff --git a/internal/route/request_executor_test.go b/internal/route/request_executor_test.go index 45494977..23542942 100644 --- a/internal/route/request_executor_test.go +++ b/internal/route/request_executor_test.go @@ -70,15 +70,15 @@ func TestRetrieveOrCacheRequest(t *testing.T) { expected, _ := rpcCache.Marshal(raw) redisClientMock.ExpectSet(cacheKey, expected, cacheConfig.TTL).SetVal("OK") - respBody, resp, cached, _ := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() + jsonRPCResponseBody, httpResponse, cached, _ := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - singleRespBody := respBody.GetSubResponses()[0] + singleRespBody := jsonRPCResponseBody.GetSubResponses()[0] assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Nil(t, singleRespBody.Error) assert.Equal(t, raw, singleRespBody.Result) + assert.Equal(t, httpResp.StatusCode, httpResponse.StatusCode) assert.False(t, cached) // Send a new request with new ID. @@ -90,15 +90,15 @@ func TestRetrieveOrCacheRequest(t *testing.T) { // SetVal simulates returned value on a Get cache hit. redisClientMock.ExpectGet(cacheKey).SetVal(bytes.NewBuffer(expected).String()) - respBody, resp, cached, _ = executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() + jsonRPCResponseBody, httpResponse, cached, _ = executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - singleRespBody = respBody.GetSubResponses()[0] + singleRespBody = jsonRPCResponseBody.GetSubResponses()[0] assert.Equal(t, int64(20), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Nil(t, singleRespBody.Error) assert.Equal(t, raw, singleRespBody.Result) + assert.Equal(t, httpResp.StatusCode, httpResponse.StatusCode) assert.True(t, cached) if err := redisClientMock.ExpectationsWereMet(); err != nil { @@ -144,9 +144,6 @@ func TestRetrieveOrCacheRequest_OriginError(t *testing.T) { httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - if resp != nil { - resp.Body.Close() - } assert.Nil(t, respBody) assert.Nil(t, resp) @@ -191,11 +188,11 @@ func TestRetrieveOrCacheRequest_JSONRPCError(t *testing.T) { bodyBytes, _ := requestBody.Encode() httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() singleRespBody := respBody.GetSubResponses()[0] assert.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode) assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Equal(t, "RPC error", singleRespBody.Error.Message) @@ -238,11 +235,11 @@ func TestRetrieveOrCacheRequest_NullResultError(t *testing.T) { bodyBytes, _ := requestBody.Encode() httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() singleRespBody := respBody.GetSubResponses()[0] assert.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode) assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Equal(t, json.RawMessage("null"), singleRespBody.Result) diff --git a/internal/route/router.go b/internal/route/router.go index 9ccc01f5..8ccd95b6 100644 --- a/internal/route/router.go +++ b/internal/route/router.go @@ -1,15 +1,12 @@ package route import ( - "bytes" "context" - "errors" "github.com/satsuma-data/node-gateway/internal/cache" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" - "io" "net/http" "strconv" "time" @@ -31,7 +28,7 @@ import ( type Router interface { Start() IsInitialized() bool - Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error) + Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error) } type SimpleRouter struct { @@ -111,22 +108,12 @@ func (r *SimpleRouter) IsInitialized() bool { func (r *SimpleRouter) Route( ctx context.Context, requestBody jsonrpc.RequestBody, -) (string, jsonrpc.ResponseBody, *http.Response, error) { +) (string, jsonrpc.ResponseBody, error) { requestMetadata := r.metadataParser.Parse(requestBody) upstreamID, err := r.routingStrategy.RouteNextRequest(r.priorityToUpstreams, requestMetadata) if err != nil { - switch { - case errors.Is(err, ErrNoHealthyUpstreams): - httpResp := &http.Response{ - StatusCode: http.StatusServiceUnavailable, - Body: io.NopCloser(bytes.NewBufferString(err.Error())), - } - - return "", nil, httpResp, nil - default: - return "", nil, nil, err - } + return "", nil, err } var configToRoute config.UpstreamConfig @@ -140,11 +127,11 @@ func (r *SimpleRouter) Route( r.logger.Debug("Routing request to upstream.", zap.String("upstreamID", upstreamID), zap.Any("request", requestBody), zap.String("client", util.GetClientFromContext(ctx))) start := time.Now() - body, response, cached, err := r.requestExecutor.routeToConfig(ctx, requestBody, &configToRoute) + jsonRPCResponse, httpResponse, cached, err := r.requestExecutor.routeToConfig(ctx, requestBody, &configToRoute) HTTPReponseCode := "" - if response != nil { - HTTPReponseCode = strconv.Itoa(response.StatusCode) + if httpResponse != nil { + HTTPReponseCode = strconv.Itoa(httpResponse.StatusCode) } r.metricsContainer.UpstreamRPCRequestsTotal.WithLabelValues( @@ -155,7 +142,7 @@ func (r *SimpleRouter) Route( strconv.FormatBool(cached), ).Inc() - if err != nil { + if err != nil || httpResponse.StatusCode >= http.StatusBadRequest { r.metricsContainer.UpstreamRPCRequestErrorsTotal.WithLabelValues( util.GetClientFromContext(ctx), upstreamID, @@ -165,7 +152,7 @@ func (r *SimpleRouter) Route( ).Inc() } - if body != nil { + if jsonRPCResponse != nil { // To help correlate request IDs to responses. // It's the responsibility of the client to provide unique IDs. reqIDToRequestMap := make(map[int64]jsonrpc.SingleRequestBody) @@ -180,9 +167,9 @@ func (r *SimpleRouter) Route( reqIDToRequestMap[*req.ID] = req } - for _, resp := range body.GetSubResponses() { + for _, resp := range jsonRPCResponse.GetSubResponses() { if resp.Error != nil { - zap.L().Warn("Encountered error in upstream JSONRPC response.", + r.logger.Warn("Encountered error in upstream JSONRPC response.", zap.Any("request", requestBody), zap.Any("error", resp.Error), zap.String("client", util.GetClientFromContext(ctx)), zap.String("upstreamID", upstreamID)) @@ -196,7 +183,6 @@ func (r *SimpleRouter) Route( upstreamID, configToRoute.HTTPURL, reqIDToRequestMap[resp.ID].Method, - HTTPReponseCode, strconv.Itoa(resp.Error.Code), ).Inc() } @@ -211,5 +197,5 @@ func (r *SimpleRouter) Route( HTTPReponseCode, ).Observe(time.Since(start).Seconds()) - return upstreamID, body, response, err + return upstreamID, jsonRPCResponse, err } diff --git a/internal/route/router_test.go b/internal/route/router_test.go index 97ae667e..354d55fc 100644 --- a/internal/route/router_test.go +++ b/internal/route/router_test.go @@ -35,24 +35,16 @@ func TestRouter_NoHealthyUpstreams(t *testing.T) { cacheConfig := config.ChainCacheConfig{} routingStrategy := mocks.NewMockRoutingStrategy(t) - routingStrategy.EXPECT().RouteNextRequest(mock.Anything, mock.Anything).Return("", ErrNoHealthyUpstreams) + routingStrategy.EXPECT().RouteNextRequest(mock.Anything, mock.Anything).Return("", DefaultNoHealthyUpstreamsError) router := NewRouter("mainnet", cacheConfig, upstreamConfigs, make([]config.GroupConfig, 0), metadata.NewChainMetadataStore(), managerMock, routingStrategy, metrics.NewContainer(config.TestChainName), zap.L(), nil) router.(*SimpleRouter).healthCheckManager = managerMock router.Start() - _, jsonResp, httpResp, err := router.Route(context.Background(), &jsonrpc.BatchRequestBody{}) - defer httpResp.Body.Close() + _, jsonResp, err := router.Route(context.Background(), &jsonrpc.BatchRequestBody{}) assert.Nil(t, jsonResp) - assert.Equal(t, 503, httpResp.StatusCode) - assert.Equal(t, "no healthy upstreams", readyBody(httpResp.Body)) - assert.Nil(t, err) -} - -func readyBody(body io.ReadCloser) string { - bodyBytes, _ := io.ReadAll(body) - return string(bodyBytes) + assert.Equal(t, DefaultNoHealthyUpstreamsError, err) } func TestRouter_GroupUpstreamsByPriority(t *testing.T) { @@ -116,8 +108,7 @@ func TestRouter_GroupUpstreamsByPriority(t *testing.T) { router.(*SimpleRouter).requestExecutor.httpClient = httpClientMock router.(*SimpleRouter).routingStrategy = routingStrategyMock - upstreamID, jsonRPCResp, httpResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) - defer httpResp.Body.Close() + upstreamID, jsonRPCResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) assert.Nil(t, err) assert.Equal(t, erigonConfig.ID, upstreamID) @@ -163,8 +154,7 @@ func TestGroupUpstreamsByPriority_NoGroups(t *testing.T) { router.(*SimpleRouter).requestExecutor.httpClient = httpClientMock router.(*SimpleRouter).routingStrategy = routingStrategyMock - upstreamID, jsonRPCResp, httpResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) - defer httpResp.Body.Close() + upstreamID, jsonRPCResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) assert.Nil(t, err) assert.Equal(t, erigonConfig.ID, upstreamID) diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index 2fd9a434..9a59c74c 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -1,8 +1,6 @@ package route import ( - "errors" - "sort" "sync/atomic" @@ -32,7 +30,15 @@ func NewPriorityRoundRobinStrategy(logger *zap.Logger) *PriorityRoundRobinStrate } } -var ErrNoHealthyUpstreams = errors.New("no healthy upstreams") +type NoHealthyUpstreamsError struct { + msg string +} + +var DefaultNoHealthyUpstreamsError = &NoHealthyUpstreamsError{"no healthy upstreams found"} + +func (e *NoHealthyUpstreamsError) Error() string { + return e.msg +} func (s *PriorityRoundRobinStrategy) RouteNextRequest( upstreamsByPriority types.PriorityToUpstreamsMap, @@ -53,5 +59,5 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( s.logger.Debug("Did not find any healthy nodes in priority.", zap.Int("priority", priority)) } - return "", ErrNoHealthyUpstreams + return "", DefaultNoHealthyUpstreamsError } diff --git a/internal/route/routing_strategy_test.go b/internal/route/routing_strategy_test.go index d7aaa253..96b7025d 100644 --- a/internal/route/routing_strategy_test.go +++ b/internal/route/routing_strategy_test.go @@ -62,6 +62,6 @@ func TestPriorityStrategy_NoUpstreams(t *testing.T) { for i := 0; i < 10; i++ { upstreamID, err := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "", upstreamID) - assert.True(t, errors.Is(err, ErrNoHealthyUpstreams)) + assert.True(t, errors.Is(err, DefaultNoHealthyUpstreamsError)) } } diff --git a/internal/server/rpc_handler.go b/internal/server/rpc_handler.go index 1e3e9e90..7516e1c6 100644 --- a/internal/server/rpc_handler.go +++ b/internal/server/rpc_handler.go @@ -55,26 +55,31 @@ func (h *RPCHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { h.logger.Debug("Request received.", zap.String("method", req.Method), zap.String("path", req.URL.Path), zap.String("query", req.URL.RawQuery), zap.Any("body", requestBody)) - upstreamID, respBody, resp, err := h.router.Route(ctx, requestBody) - if resp != nil { - defer resp.Body.Close() - } + upstreamID, jsonRPCRespBody, err := h.router.Route(ctx, requestBody) if err != nil { switch e := err.(type) { - case jsonrpc.DecodeError: + // Still pass the response to client if we're not able to decode response from upstream. + case *jsonrpc.DecodeError: respondRaw(nil, writer, e.Content, http.StatusOK) return + case *route.NoHealthyUpstreamsError: + respondJSON(h.logger, writer, "No healthy upstreams.", http.StatusServiceUnavailable) + return default: - resp := jsonrpc.CreateErrorJSONRPCResponseBodyWithRequest(fmt.Sprintf("Request could not be routed, err: %s", err.Error()), jsonrpc.InternalServerErrorCode, requestBody) - respondJSONRPC(h.logger, writer, resp, http.StatusInternalServerError) + statusCode := http.StatusInternalServerError + if originErr, ok := err.(*route.OriginError); ok && originErr.ResponseCode != 0 { + statusCode = originErr.ResponseCode + } + + respondJSON(h.logger, writer, fmt.Sprintf("Request could not be routed, err: %s", err.Error()), statusCode) return } } writer.Header().Set("X-Upstream-ID", upstreamID) - respondJSONRPC(h.logger, writer, respBody, resp.StatusCode) + respondJSONRPC(h.logger, writer, jsonRPCRespBody, http.StatusOK) h.logger.Debug("Request successfully routed.", zap.Any("requestBody", requestBody)) } diff --git a/internal/server/web_server_test.go b/internal/server/web_server_test.go index 6f089204..50e6a321 100644 --- a/internal/server/web_server_test.go +++ b/internal/server/web_server_test.go @@ -7,7 +7,6 @@ import ( "io" "net/http" "net/http/httptest" - "strings" "testing" "github.com/satsuma-data/node-gateway/internal/config" @@ -28,10 +27,7 @@ func TestHandleJSONRPCRequest_Success(t *testing.T) { } router.EXPECT().Route(mock.Anything, mock.Anything). Return("fakeUpstream", expectedRPCResponse, - &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(strings.NewReader("dummy")), - }, nil) + nil) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} @@ -126,11 +122,7 @@ func TestHandleJSONRPCRequest_NilJSONRPCResponse(t *testing.T) { router := mocks.NewRouter(t) router.EXPECT().Route(mock.Anything, mock.Anything). - Return("", nil, - &http.Response{ - StatusCode: http.StatusAccepted, - Body: io.NopCloser(strings.NewReader("dummy")), - }, nil) + Return("", nil, nil) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} @@ -145,7 +137,7 @@ func TestHandleJSONRPCRequest_NilJSONRPCResponse(t *testing.T) { result := recorder.Result() defer result.Body.Close() - assert.Equal(t, http.StatusAccepted, result.StatusCode) + assert.Equal(t, http.StatusOK, result.StatusCode) body, _ := io.ReadAll(result.Body) assert.Empty(t, body) } @@ -155,7 +147,7 @@ func TestHandleJSONRPCRequest_JSONRPCDecodeError(t *testing.T) { undecodableContent := []byte("content") router.EXPECT().Route(mock.Anything, mock.Anything). - Return("", nil, nil, jsonrpc.DecodeError{Err: errors.New("error decoding"), Content: undecodableContent}) + Return("", nil, &jsonrpc.DecodeError{Err: errors.New("error decoding"), Content: undecodableContent}) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} diff --git a/internal/util/http.go b/internal/util/http.go new file mode 100644 index 00000000..b0b1440b --- /dev/null +++ b/internal/util/http.go @@ -0,0 +1,18 @@ +package util + +import ( + "bytes" + "io" + "net/http" +) + +func ReadAndCopyBackResponseBody(resp *http.Response) string { + respBody, _ := io.ReadAll(resp.Body) + respBodyString := string(respBody) + + resp.Body.Close() + + resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) + + return respBodyString +}