From 240b9e03c2a7374b9eac7468c91c1b886f1359e9 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Mon, 18 Dec 2023 18:14:20 -0800 Subject: [PATCH 1/6] fix: node gateway error handling improvements --- internal/jsonrpc/jsonrpc.go | 50 +++-------- internal/jsonrpc/jsonrpc_test.go | 27 +++--- internal/metrics/metrics.go | 2 +- internal/mocks/BlockHeightChecker.go | 23 +++-- internal/mocks/Checker.go | 15 ++-- internal/mocks/EthClient.go | 27 ++++-- internal/mocks/HTTPClient.go | 15 ++-- internal/mocks/HealthCheckManager.go | 19 +++-- internal/mocks/Router.go | 47 +++++------ internal/mocks/RoutingStrategy.go | 15 ++-- internal/route/request_executor.go | 106 +++++++++++++++--------- internal/route/request_executor_test.go | 19 ++--- internal/route/router.go | 36 +++----- internal/route/router_test.go | 20 ++--- internal/route/routing_strategy.go | 14 +++- internal/route/routing_strategy_test.go | 2 +- internal/server/rpc_handler.go | 33 ++++++-- internal/server/web_server_e2e_test.go | 15 +++- internal/server/web_server_test.go | 20 ++--- internal/util/http.go | 20 +++++ 20 files changed, 293 insertions(+), 232 deletions(-) create mode 100644 internal/util/http.go diff --git a/internal/jsonrpc/jsonrpc.go b/internal/jsonrpc/jsonrpc.go index d0ce18a2..243d2edf 100644 --- a/internal/jsonrpc/jsonrpc.go +++ b/internal/jsonrpc/jsonrpc.go @@ -3,9 +3,8 @@ package jsonrpc import ( "bytes" "encoding/json" + "errors" "fmt" - "io" - "net/http" ) const JSONRPCVersion = "2.0" @@ -102,73 +101,50 @@ type DecodeError struct { Content []byte // Content that couldn't be decoded. } -func NewDecodeError(err error, content []byte) DecodeError { - return DecodeError{ +func NewDecodeError(err error, content []byte) *DecodeError { + return &DecodeError{ Err: err, Content: content, } } -func (e DecodeError) Error() string { +func (e *DecodeError) Error() string { return fmt.Sprintf("decode error: %s, content: %s", e.Err.Error(), string(e.Content)) } -func DecodeRequestBody(req *http.Request) (RequestBody, error) { - // No need to close the request body, the Server implementation will take care of it. - requestRawBytes, err := io.ReadAll(req.Body) - - if err != nil { - return nil, NewDecodeError(err, requestRawBytes) - } - - var body *SingleRequestBody - +func DecodeRequestBody(requestBodyRawBytes []byte) (RequestBody, error) { // Try non-batch first as these are probably more common. - if body, err = decode[SingleRequestBody](requestRawBytes); err == nil { + if body, err := decode[SingleRequestBody](requestBodyRawBytes); err == nil { return body, nil } - var batchBody *[]SingleRequestBody - - if batchBody, err = decode[[]SingleRequestBody](requestRawBytes); err == nil { + if batchBody, err := decode[[]SingleRequestBody](requestBodyRawBytes); err == nil { return &BatchRequestBody{ Requests: *batchBody, }, nil } - return nil, NewDecodeError(err, requestRawBytes) + return nil, NewDecodeError(errors.New("unexpected decoding request error"), requestBodyRawBytes) } -func DecodeResponseBody(resp *http.Response) (ResponseBody, error) { - // As per the spec, it is the caller's responsibility to close the response body. - defer resp.Body.Close() - responseRawBytes, err := io.ReadAll(resp.Body) - - if err != nil { - return nil, NewDecodeError(err, responseRawBytes) - } - +func DecodeResponseBody(responseBodyRawBytes []byte) (ResponseBody, error) { // Empty JSON RPC responses are valid for "Notifications" (requests without "ID") https://www.jsonrpc.org/specification#notification - if len(responseRawBytes) == 0 { + if len(responseBodyRawBytes) == 0 { return nil, nil } - var body *SingleResponseBody - // Try non-batch first as these are probably more common. - if body, err = decode[SingleResponseBody](responseRawBytes); err == nil { + if body, err := decode[SingleResponseBody](responseBodyRawBytes); err == nil { return body, nil } - var batchBody *[]SingleResponseBody - - if batchBody, err = decode[[]SingleResponseBody](responseRawBytes); err == nil { + if batchBody, err := decode[[]SingleResponseBody](responseBodyRawBytes); err == nil { return &BatchResponseBody{ Responses: *batchBody, }, nil } - return nil, NewDecodeError(err, responseRawBytes) + return nil, NewDecodeError(errors.New("unexpected decoding response error"), responseBodyRawBytes) } func decode[T Decodable](rawBytes []byte) (*T, error) { diff --git a/internal/jsonrpc/jsonrpc_test.go b/internal/jsonrpc/jsonrpc_test.go index 5074bc93..f031b644 100644 --- a/internal/jsonrpc/jsonrpc_test.go +++ b/internal/jsonrpc/jsonrpc_test.go @@ -1,10 +1,7 @@ package jsonrpc import ( - "bytes" "encoding/json" - "io" - "net/http" "testing" "github.com/samber/lo" @@ -91,10 +88,7 @@ func TestEncodeAndDecodeRequests(t *testing.T) { }, }, } { - req := http.Request{ - Body: io.NopCloser(bytes.NewReader([]byte(tc.body))), - } - decoded, err := DecodeRequestBody(&req) + decoded, err := DecodeRequestBody([]byte(tc.body)) assert.Nil(t, err) assert.Equal(t, tc.expectedRequest, decoded) @@ -143,6 +137,11 @@ func TestEncodeAndDecodeResponses(t *testing.T) { }, }, }, + { + testName: "empty response in batch", + body: ``, + expectedResponse: nil, + }, { testName: "batch responses", body: "[" + @@ -171,18 +170,16 @@ func TestEncodeAndDecodeResponses(t *testing.T) { }, }, } { - resp := http.Response{ - Body: io.NopCloser(bytes.NewReader([]byte(tc.body))), - } - - decoded, err := DecodeResponseBody(&resp) + decoded, err := DecodeResponseBody([]byte(tc.body)) assert.Nil(t, err) assert.Equal(t, tc.expectedResponse, decoded) - encoded, err := decoded.Encode() + if decoded != nil { + encoded, err := decoded.Encode() - assert.Nil(t, err) - assert.Equal(t, tc.body, string(encoded)) + assert.Nil(t, err) + assert.Equal(t, tc.body, string(encoded)) + } } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 6a5dfdef..1beb8869 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -94,7 +94,7 @@ var ( "Batches are deconstructed to single JSON RPC requests for this metric.", }, // jsonrpc_method is "batch" for batch requests - []string{"chain_name", "client", "upstream_id", "url", "jsonrpc_method", "response_code", "jsonrpc_error_code"}, + []string{"chain_name", "client", "upstream_id", "url", "jsonrpc_method", "jsonrpc_error_code"}, ) upstreamRPCDuration = promauto.NewHistogramVec( diff --git a/internal/mocks/BlockHeightChecker.go b/internal/mocks/BlockHeightChecker.go index c2761db3..a2765cf0 100644 --- a/internal/mocks/BlockHeightChecker.go +++ b/internal/mocks/BlockHeightChecker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -21,6 +21,10 @@ func (_m *BlockHeightChecker) EXPECT() *BlockHeightChecker_Expecter { func (_m *BlockHeightChecker) GetBlockHeight() uint64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetBlockHeight") + } + var r0 uint64 if rf, ok := ret.Get(0).(func() uint64); ok { r0 = rf() @@ -62,6 +66,10 @@ func (_c *BlockHeightChecker_GetBlockHeight_Call) RunAndReturn(run func() uint64 func (_m *BlockHeightChecker) GetError() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetError") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -103,6 +111,10 @@ func (_c *BlockHeightChecker_GetError_Call) RunAndReturn(run func() error) *Bloc func (_m *BlockHeightChecker) IsPassing(maxBlockHeight uint64) bool { ret := _m.Called(maxBlockHeight) + if len(ret) == 0 { + panic("no return value specified for IsPassing") + } + var r0 bool if rf, ok := ret.Get(0).(func(uint64) bool); ok { r0 = rf(maxBlockHeight) @@ -173,13 +185,12 @@ func (_c *BlockHeightChecker_RunCheck_Call) RunAndReturn(run func()) *BlockHeigh return _c } -type mockConstructorTestingTNewBlockHeightChecker interface { +// NewBlockHeightChecker creates a new instance of BlockHeightChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBlockHeightChecker(t interface { mock.TestingT Cleanup(func()) -} - -// NewBlockHeightChecker creates a new instance of BlockHeightChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBlockHeightChecker(t mockConstructorTestingTNewBlockHeightChecker) *BlockHeightChecker { +}) *BlockHeightChecker { mock := &BlockHeightChecker{} mock.Mock.Test(t) diff --git a/internal/mocks/Checker.go b/internal/mocks/Checker.go index 5b243084..79f642c6 100644 --- a/internal/mocks/Checker.go +++ b/internal/mocks/Checker.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -21,6 +21,10 @@ func (_m *Checker) EXPECT() *Checker_Expecter { func (_m *Checker) IsPassing() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsPassing") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -90,13 +94,12 @@ func (_c *Checker_RunCheck_Call) RunAndReturn(run func()) *Checker_RunCheck_Call return _c } -type mockConstructorTestingTNewChecker interface { +// NewChecker creates a new instance of Checker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewChecker(t interface { mock.TestingT Cleanup(func()) -} - -// NewChecker creates a new instance of Checker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewChecker(t mockConstructorTestingTNewChecker) *Checker { +}) *Checker { mock := &Checker{} mock.Mock.Test(t) diff --git a/internal/mocks/EthClient.go b/internal/mocks/EthClient.go index 4e8a299f..b6dfb96d 100644 --- a/internal/mocks/EthClient.go +++ b/internal/mocks/EthClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -31,6 +31,10 @@ func (_m *EthClient) EXPECT() *EthClient_Expecter { func (_m *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { ret := _m.Called(ctx, number) + if len(ret) == 0 { + panic("no return value specified for HeaderByNumber") + } + var r0 *types.Header var r1 error if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (*types.Header, error)); ok { @@ -86,6 +90,10 @@ func (_c *EthClient_HeaderByNumber_Call) RunAndReturn(run func(context.Context, func (_m *EthClient) PeerCount(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for PeerCount") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { @@ -138,6 +146,10 @@ func (_c *EthClient_PeerCount_Call) RunAndReturn(run func(context.Context) (uint func (_m *EthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { ret := _m.Called(ctx, ch) + if len(ret) == 0 { + panic("no return value specified for SubscribeNewHead") + } + var r0 ethereum.Subscription var r1 error if rf, ok := ret.Get(0).(func(context.Context, chan<- *types.Header) (ethereum.Subscription, error)); ok { @@ -193,6 +205,10 @@ func (_c *EthClient_SubscribeNewHead_Call) RunAndReturn(run func(context.Context func (_m *EthClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for SyncProgress") + } + var r0 *ethereum.SyncProgress var r1 error if rf, ok := ret.Get(0).(func(context.Context) (*ethereum.SyncProgress, error)); ok { @@ -243,13 +259,12 @@ func (_c *EthClient_SyncProgress_Call) RunAndReturn(run func(context.Context) (* return _c } -type mockConstructorTestingTNewEthClient interface { +// NewEthClient creates a new instance of EthClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEthClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewEthClient creates a new instance of EthClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEthClient(t mockConstructorTestingTNewEthClient) *EthClient { +}) *EthClient { mock := &EthClient{} mock.Mock.Test(t) diff --git a/internal/mocks/HTTPClient.go b/internal/mocks/HTTPClient.go index 581cc300..fa4f597d 100644 --- a/internal/mocks/HTTPClient.go +++ b/internal/mocks/HTTPClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -17,6 +17,10 @@ type HTTPClient struct { func (_m *HTTPClient) Do(req *http.Request) (*http.Response, error) { ret := _m.Called(req) + if len(ret) == 0 { + panic("no return value specified for Do") + } + var r0 *http.Response var r1 error if rf, ok := ret.Get(0).(func(*http.Request) (*http.Response, error)); ok { @@ -39,13 +43,12 @@ func (_m *HTTPClient) Do(req *http.Request) (*http.Response, error) { return r0, r1 } -type mockConstructorTestingTNewHTTPClient interface { +// NewHTTPClient creates a new instance of HTTPClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHTTPClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewHTTPClient creates a new instance of HTTPClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHTTPClient(t mockConstructorTestingTNewHTTPClient) *HTTPClient { +}) *HTTPClient { mock := &HTTPClient{} mock.Mock.Test(t) diff --git a/internal/mocks/HealthCheckManager.go b/internal/mocks/HealthCheckManager.go index fe99cc64..483a8fdd 100644 --- a/internal/mocks/HealthCheckManager.go +++ b/internal/mocks/HealthCheckManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -24,6 +24,10 @@ func (_m *HealthCheckManager) EXPECT() *HealthCheckManager_Expecter { func (_m *HealthCheckManager) GetUpstreamStatus(upstreamID string) *types.UpstreamStatus { ret := _m.Called(upstreamID) + if len(ret) == 0 { + panic("no return value specified for GetUpstreamStatus") + } + var r0 *types.UpstreamStatus if rf, ok := ret.Get(0).(func(string) *types.UpstreamStatus); ok { r0 = rf(upstreamID) @@ -68,6 +72,10 @@ func (_c *HealthCheckManager_GetUpstreamStatus_Call) RunAndReturn(run func(strin func (_m *HealthCheckManager) IsInitialized() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsInitialized") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -137,13 +145,12 @@ func (_c *HealthCheckManager_StartHealthChecks_Call) RunAndReturn(run func()) *H return _c } -type mockConstructorTestingTNewHealthCheckManager interface { +// NewHealthCheckManager creates a new instance of HealthCheckManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHealthCheckManager(t interface { mock.TestingT Cleanup(func()) -} - -// NewHealthCheckManager creates a new instance of HealthCheckManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHealthCheckManager(t mockConstructorTestingTNewHealthCheckManager) *HealthCheckManager { +}) *HealthCheckManager { mock := &HealthCheckManager{} mock.Mock.Test(t) diff --git a/internal/mocks/Router.go b/internal/mocks/Router.go index 7516fdf3..31e4897a 100644 --- a/internal/mocks/Router.go +++ b/internal/mocks/Router.go @@ -1,10 +1,9 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks import ( context "context" - http "net/http" jsonrpc "github.com/satsuma-data/node-gateway/internal/jsonrpc" mock "github.com/stretchr/testify/mock" @@ -27,6 +26,10 @@ func (_m *Router) EXPECT() *Router_Expecter { func (_m *Router) IsInitialized() bool { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for IsInitialized") + } + var r0 bool if rf, ok := ret.Get(0).(func() bool); ok { r0 = rf() @@ -65,14 +68,17 @@ func (_c *Router_IsInitialized_Call) RunAndReturn(run func() bool) *Router_IsIni } // Route provides a mock function with given fields: ctx, requestBody -func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error) { +func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error) { ret := _m.Called(ctx, requestBody) + if len(ret) == 0 { + panic("no return value specified for Route") + } + var r0 string var r1 jsonrpc.ResponseBody - var r2 *http.Response - var r3 error - if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error)); ok { + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error)); ok { return rf(ctx, requestBody) } if rf, ok := ret.Get(0).(func(context.Context, jsonrpc.RequestBody) string); ok { @@ -89,21 +95,13 @@ func (_m *Router) Route(ctx context.Context, requestBody jsonrpc.RequestBody) (s } } - if rf, ok := ret.Get(2).(func(context.Context, jsonrpc.RequestBody) *http.Response); ok { + if rf, ok := ret.Get(2).(func(context.Context, jsonrpc.RequestBody) error); ok { r2 = rf(ctx, requestBody) } else { - if ret.Get(2) != nil { - r2 = ret.Get(2).(*http.Response) - } - } - - if rf, ok := ret.Get(3).(func(context.Context, jsonrpc.RequestBody) error); ok { - r3 = rf(ctx, requestBody) - } else { - r3 = ret.Error(3) + r2 = ret.Error(2) } - return r0, r1, r2, r3 + return r0, r1, r2 } // Router_Route_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Route' @@ -125,12 +123,12 @@ func (_c *Router_Route_Call) Run(run func(ctx context.Context, requestBody jsonr return _c } -func (_c *Router_Route_Call) Return(_a0 string, _a1 jsonrpc.ResponseBody, _a2 *http.Response, _a3 error) *Router_Route_Call { - _c.Call.Return(_a0, _a1, _a2, _a3) +func (_c *Router_Route_Call) Return(_a0 string, _a1 jsonrpc.ResponseBody, _a2 error) *Router_Route_Call { + _c.Call.Return(_a0, _a1, _a2) return _c } -func (_c *Router_Route_Call) RunAndReturn(run func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error)) *Router_Route_Call { +func (_c *Router_Route_Call) RunAndReturn(run func(context.Context, jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error)) *Router_Route_Call { _c.Call.Return(run) return _c } @@ -167,13 +165,12 @@ func (_c *Router_Start_Call) RunAndReturn(run func()) *Router_Start_Call { return _c } -type mockConstructorTestingTNewRouter interface { +// NewRouter creates a new instance of Router. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRouter(t interface { mock.TestingT Cleanup(func()) -} - -// NewRouter creates a new instance of Router. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewRouter(t mockConstructorTestingTNewRouter) *Router { +}) *Router { mock := &Router{} mock.Mock.Test(t) diff --git a/internal/mocks/RoutingStrategy.go b/internal/mocks/RoutingStrategy.go index ca4824d7..6f107b43 100644 --- a/internal/mocks/RoutingStrategy.go +++ b/internal/mocks/RoutingStrategy.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.20.0. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -26,6 +26,10 @@ func (_m *MockRoutingStrategy) EXPECT() *MockRoutingStrategy_Expecter { func (_m *MockRoutingStrategy) RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap, requestMetadata metadata.RequestMetadata) (string, error) { ret := _m.Called(upstreamsByPriority, requestMetadata) + if len(ret) == 0 { + panic("no return value specified for RouteNextRequest") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(types.PriorityToUpstreamsMap, metadata.RequestMetadata) (string, error)); ok { @@ -75,13 +79,12 @@ func (_c *MockRoutingStrategy_RouteNextRequest_Call) RunAndReturn(run func(types return _c } -type mockConstructorTestingTNewMockRoutingStrategy interface { +// NewMockRoutingStrategy creates a new instance of MockRoutingStrategy. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRoutingStrategy(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockRoutingStrategy creates a new instance of MockRoutingStrategy. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRoutingStrategy(t mockConstructorTestingTNewMockRoutingStrategy) *MockRoutingStrategy { +}) *MockRoutingStrategy { mock := &MockRoutingStrategy{} mock.Mock.Test(t) diff --git a/internal/route/request_executor.go b/internal/route/request_executor.go index 77bf220e..4779a2ae 100644 --- a/internal/route/request_executor.go +++ b/internal/route/request_executor.go @@ -14,6 +14,7 @@ import ( "github.com/satsuma-data/node-gateway/internal/client" "github.com/satsuma-data/node-gateway/internal/config" "github.com/satsuma-data/node-gateway/internal/jsonrpc" + "github.com/satsuma-data/node-gateway/internal/util" "go.uber.org/zap" ) @@ -34,18 +35,31 @@ func (e *HandledError) Error() string { } type OriginError struct { - err error + err error + response string + ResponseCode int +} + +type HTTPResponse struct { + StatusCode int } func (e *OriginError) Error() string { return fmt.Sprintf("error making request to origin: %v", e.err) } +/* + * Return arguments + * 1. JSON Response body - Body of response decoded into JSON RPC, if possible. + * 2. HTTP Response - HTTP response from upstream, if possible. + * 3. Cached - Whether the response was cached. + * 4. Error - Error encountered when making request to upstream or another error encountered during processing. + */ func (r *RequestExecutor) routeToConfig( ctx context.Context, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig, -) (jsonrpc.ResponseBody, *http.Response, bool, error) { +) (jsonrpc.ResponseBody, *HTTPResponse, bool, error) { bodyBytes, err := requestBody.Encode() if err != nil { r.logger.Error("Could not serialize request.", zap.Any("request", requestBody), zap.Error(err)) @@ -74,7 +88,7 @@ func (r *RequestExecutor) routeToConfig( var ( respBody jsonrpc.ResponseBody - resp *http.Response + resp *HTTPResponse ) singleRequestBody, isSingleRequestBody := requestBody.(*jsonrpc.SingleRequestBody) @@ -85,27 +99,24 @@ func (r *RequestExecutor) routeToConfig( // We must clone the httpReq otherwise the body will already be closed on the second request. respBody, resp, cached, err = r.retrieveOrCacheRequest(cloneRequest(httpReq), *singleRequestBody, configToRoute) if err != nil { - originError, _ := err.(*OriginError) - // An OriginError indicates a cache miss and request failure to origin. - // We want this error to bubble up. - // Unknown cache errors will default back to the "non-caching" behavior. - if originError != nil { + switch e := err.(type) { + case *OriginError, *jsonrpc.DecodeError: + // These errors indicates a cache miss and request failure to origin. + // We want this error to bubble up. + // Unknown cache errors will default back to the "non-caching" behavior. r.logger.Warn("caching error making request to origin", zap.Error(err), zap.Any("request", requestBody)) - return nil, nil, cached, originError + return nil, nil, cached, e + default: + r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody)) } - - r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody)) } else { return respBody, resp, cached, nil } } respBody, resp, err = r.getResponseBody(httpReq, requestBody, configToRoute) - if err != nil { - return nil, nil, false, err - } - return respBody, resp, false, nil + return respBody, resp, false, err } func (r *RequestExecutor) useCache(requestBody jsonrpc.RequestBody) bool { @@ -116,22 +127,22 @@ func (r *RequestExecutor) useCache(requestBody jsonrpc.RequestBody) bool { return r.cache.ShouldCacheMethod(requestBody.GetMethod()) } -func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestBody jsonrpc.SingleRequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *http.Response, bool, error) { +func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestBody jsonrpc.SingleRequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *HTTPResponse, bool, error) { var ( - respBody jsonrpc.ResponseBody - resp *http.Response + jsonRPCRespBody jsonrpc.ResponseBody + httpResp *HTTPResponse ) originFunc := func() (*jsonrpc.SingleResponseBody, error) { var err error // Any errors will result in respBody and resp being nil. - respBody, resp, err = r.getResponseBody(httpReq, &requestBody, configToRoute) //nolint:bodyclose // linter bug + jsonRPCRespBody, httpResp, err = r.getResponseBody(httpReq, &requestBody, configToRoute) if err != nil { return nil, err } - singleRespBody, ok := respBody.(*jsonrpc.SingleResponseBody) + singleRespBody, ok := jsonRPCRespBody.(*jsonrpc.SingleResponseBody) if !ok { return nil, errors.New("batched responses do not support caching") } @@ -157,22 +168,21 @@ func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestB case *HandledError: // The cache uses request coalescing, an error may be returned by another goroutine. // Construct a responsebody and a fake response. - if resp == nil && respBody == nil { - resp = &http.Response{ + if httpResp == nil && jsonRPCRespBody == nil { + httpResp = &HTTPResponse{ StatusCode: http.StatusOK, - Body: io.NopCloser(new(bytes.Buffer)), } rb := *err.rb rb.ID = *requestBody.ID rb.JSONRPC = requestBody.JSONRPCVersion - respBody = &rb + jsonRPCRespBody = &rb } default: return nil, nil, cached, err } } - if resp == nil && respBody == nil { + if httpResp == nil && jsonRPCRespBody == nil { if val == nil { return nil, nil, cached, fmt.Errorf("unexpected empty response from cache") } @@ -180,41 +190,63 @@ func (r *RequestExecutor) retrieveOrCacheRequest(httpReq *http.Request, requestB r.logger.Debug("cache hit", zap.Any("request", requestBody), zap.Any("value", val)) // Fill in id and jsonrpc in the respBody to match the request. - resp = &http.Response{ + httpResp = &HTTPResponse{ StatusCode: http.StatusOK, - Body: io.NopCloser(new(bytes.Buffer)), } - respBody = &jsonrpc.SingleResponseBody{ + jsonRPCRespBody = &jsonrpc.SingleResponseBody{ ID: *requestBody.ID, JSONRPC: requestBody.JSONRPCVersion, Result: val, } } - return respBody, resp, cached, nil + return jsonRPCRespBody, httpResp, cached, nil } -func (r *RequestExecutor) getResponseBody(httpReq *http.Request, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *http.Response, error) { +func (r *RequestExecutor) getResponseBody(httpReq *http.Request, requestBody jsonrpc.RequestBody, configToRoute *config.UpstreamConfig) (jsonrpc.ResponseBody, *HTTPResponse, error) { resp, err := r.httpClient.Do(httpReq) if err != nil { r.logger.Error("Error encountered when executing request.", zap.Any("request", requestBody), - zap.String("upstreamID", configToRoute.ID), zap.String("response", fmt.Sprintf("%v", resp)), zap.Error(err)) - return nil, nil, &OriginError{err} + zap.String("upstreamID", configToRoute.ID), zap.Error(err)) + + return nil, nil, &OriginError{err, "", 0} } + + httpResp := &HTTPResponse{resp.StatusCode} + + // Body can only be read once. Read it out and put it back in the response. + respBodyBytes, err := util.ReadAndCopyBackResponseBody(resp) + respBodyString := string(respBodyBytes) + + if err != nil { + r.logger.Error("Error encountered when reading response body.", zap.Any("request", requestBody), + zap.String("response", respBodyString), zap.String("upstreamID", configToRoute.ID), zap.Error(err)) + + return nil, httpResp, &OriginError{err, "", 0} + } + + if resp.StatusCode >= http.StatusBadRequest { + r.logger.Error("Non-2xx/3xx status code encountered when executing request.", zap.Any("request", requestBody), + zap.String("upstreamID", configToRoute.ID), zap.String("response", respBodyString), + zap.Int("httpStatusCode", resp.StatusCode), zap.Error(err)) + + return nil, httpResp, &OriginError{nil, respBodyString, resp.StatusCode} + } + defer resp.Body.Close() - respBody, err := jsonrpc.DecodeResponseBody(resp) + jsonRPCBody, err := jsonrpc.DecodeResponseBody(respBodyBytes) if err != nil { r.logger.Warn("Could not deserialize response.", zap.Any("request", requestBody), - zap.String("upstreamID", configToRoute.ID), zap.String("response", fmt.Sprintf("%v", resp)), zap.Error(err)) - return nil, nil, &OriginError{err} + zap.String("upstreamID", configToRoute.ID), zap.String("response", respBodyString), zap.Error(err)) + return nil, httpResp, err } - r.logger.Debug("Successfully routed request to upstream.", zap.String("upstreamID", configToRoute.ID), zap.Any("request", requestBody), zap.Any("response", respBody)) + r.logger.Debug("Successfully routed request to upstream.", zap.String("upstreamID", configToRoute.ID), zap.Any("request", requestBody), zap.Any("response", jsonRPCBody)) - return respBody, resp, nil + return jsonRPCBody, httpResp, nil } func cloneRequest(r *http.Request) *http.Request { diff --git a/internal/route/request_executor_test.go b/internal/route/request_executor_test.go index 45494977..23542942 100644 --- a/internal/route/request_executor_test.go +++ b/internal/route/request_executor_test.go @@ -70,15 +70,15 @@ func TestRetrieveOrCacheRequest(t *testing.T) { expected, _ := rpcCache.Marshal(raw) redisClientMock.ExpectSet(cacheKey, expected, cacheConfig.TTL).SetVal("OK") - respBody, resp, cached, _ := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() + jsonRPCResponseBody, httpResponse, cached, _ := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - singleRespBody := respBody.GetSubResponses()[0] + singleRespBody := jsonRPCResponseBody.GetSubResponses()[0] assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Nil(t, singleRespBody.Error) assert.Equal(t, raw, singleRespBody.Result) + assert.Equal(t, httpResp.StatusCode, httpResponse.StatusCode) assert.False(t, cached) // Send a new request with new ID. @@ -90,15 +90,15 @@ func TestRetrieveOrCacheRequest(t *testing.T) { // SetVal simulates returned value on a Get cache hit. redisClientMock.ExpectGet(cacheKey).SetVal(bytes.NewBuffer(expected).String()) - respBody, resp, cached, _ = executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() + jsonRPCResponseBody, httpResponse, cached, _ = executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - singleRespBody = respBody.GetSubResponses()[0] + singleRespBody = jsonRPCResponseBody.GetSubResponses()[0] assert.Equal(t, int64(20), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Nil(t, singleRespBody.Error) assert.Equal(t, raw, singleRespBody.Result) + assert.Equal(t, httpResp.StatusCode, httpResponse.StatusCode) assert.True(t, cached) if err := redisClientMock.ExpectationsWereMet(); err != nil { @@ -144,9 +144,6 @@ func TestRetrieveOrCacheRequest_OriginError(t *testing.T) { httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - if resp != nil { - resp.Body.Close() - } assert.Nil(t, respBody) assert.Nil(t, resp) @@ -191,11 +188,11 @@ func TestRetrieveOrCacheRequest_JSONRPCError(t *testing.T) { bodyBytes, _ := requestBody.Encode() httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() singleRespBody := respBody.GetSubResponses()[0] assert.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode) assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Equal(t, "RPC error", singleRespBody.Error.Message) @@ -238,11 +235,11 @@ func TestRetrieveOrCacheRequest_NullResultError(t *testing.T) { bodyBytes, _ := requestBody.Encode() httpReq, _ := http.NewRequestWithContext(ctx, "POST", configToRoute.HTTPURL, bytes.NewReader(bodyBytes)) respBody, resp, _, err := executor.retrieveOrCacheRequest(httpReq, requestBody, &configToRoute) - resp.Body.Close() singleRespBody := respBody.GetSubResponses()[0] assert.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode) assert.Equal(t, int64(1), singleRespBody.ID) assert.Equal(t, "2.0", singleRespBody.JSONRPC) assert.Equal(t, json.RawMessage("null"), singleRespBody.Result) diff --git a/internal/route/router.go b/internal/route/router.go index 9ccc01f5..8ccd95b6 100644 --- a/internal/route/router.go +++ b/internal/route/router.go @@ -1,15 +1,12 @@ package route import ( - "bytes" "context" - "errors" "github.com/satsuma-data/node-gateway/internal/cache" "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" - "io" "net/http" "strconv" "time" @@ -31,7 +28,7 @@ import ( type Router interface { Start() IsInitialized() bool - Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, *http.Response, error) + Route(ctx context.Context, requestBody jsonrpc.RequestBody) (string, jsonrpc.ResponseBody, error) } type SimpleRouter struct { @@ -111,22 +108,12 @@ func (r *SimpleRouter) IsInitialized() bool { func (r *SimpleRouter) Route( ctx context.Context, requestBody jsonrpc.RequestBody, -) (string, jsonrpc.ResponseBody, *http.Response, error) { +) (string, jsonrpc.ResponseBody, error) { requestMetadata := r.metadataParser.Parse(requestBody) upstreamID, err := r.routingStrategy.RouteNextRequest(r.priorityToUpstreams, requestMetadata) if err != nil { - switch { - case errors.Is(err, ErrNoHealthyUpstreams): - httpResp := &http.Response{ - StatusCode: http.StatusServiceUnavailable, - Body: io.NopCloser(bytes.NewBufferString(err.Error())), - } - - return "", nil, httpResp, nil - default: - return "", nil, nil, err - } + return "", nil, err } var configToRoute config.UpstreamConfig @@ -140,11 +127,11 @@ func (r *SimpleRouter) Route( r.logger.Debug("Routing request to upstream.", zap.String("upstreamID", upstreamID), zap.Any("request", requestBody), zap.String("client", util.GetClientFromContext(ctx))) start := time.Now() - body, response, cached, err := r.requestExecutor.routeToConfig(ctx, requestBody, &configToRoute) + jsonRPCResponse, httpResponse, cached, err := r.requestExecutor.routeToConfig(ctx, requestBody, &configToRoute) HTTPReponseCode := "" - if response != nil { - HTTPReponseCode = strconv.Itoa(response.StatusCode) + if httpResponse != nil { + HTTPReponseCode = strconv.Itoa(httpResponse.StatusCode) } r.metricsContainer.UpstreamRPCRequestsTotal.WithLabelValues( @@ -155,7 +142,7 @@ func (r *SimpleRouter) Route( strconv.FormatBool(cached), ).Inc() - if err != nil { + if err != nil || httpResponse.StatusCode >= http.StatusBadRequest { r.metricsContainer.UpstreamRPCRequestErrorsTotal.WithLabelValues( util.GetClientFromContext(ctx), upstreamID, @@ -165,7 +152,7 @@ func (r *SimpleRouter) Route( ).Inc() } - if body != nil { + if jsonRPCResponse != nil { // To help correlate request IDs to responses. // It's the responsibility of the client to provide unique IDs. reqIDToRequestMap := make(map[int64]jsonrpc.SingleRequestBody) @@ -180,9 +167,9 @@ func (r *SimpleRouter) Route( reqIDToRequestMap[*req.ID] = req } - for _, resp := range body.GetSubResponses() { + for _, resp := range jsonRPCResponse.GetSubResponses() { if resp.Error != nil { - zap.L().Warn("Encountered error in upstream JSONRPC response.", + r.logger.Warn("Encountered error in upstream JSONRPC response.", zap.Any("request", requestBody), zap.Any("error", resp.Error), zap.String("client", util.GetClientFromContext(ctx)), zap.String("upstreamID", upstreamID)) @@ -196,7 +183,6 @@ func (r *SimpleRouter) Route( upstreamID, configToRoute.HTTPURL, reqIDToRequestMap[resp.ID].Method, - HTTPReponseCode, strconv.Itoa(resp.Error.Code), ).Inc() } @@ -211,5 +197,5 @@ func (r *SimpleRouter) Route( HTTPReponseCode, ).Observe(time.Since(start).Seconds()) - return upstreamID, body, response, err + return upstreamID, jsonRPCResponse, err } diff --git a/internal/route/router_test.go b/internal/route/router_test.go index 97ae667e..354d55fc 100644 --- a/internal/route/router_test.go +++ b/internal/route/router_test.go @@ -35,24 +35,16 @@ func TestRouter_NoHealthyUpstreams(t *testing.T) { cacheConfig := config.ChainCacheConfig{} routingStrategy := mocks.NewMockRoutingStrategy(t) - routingStrategy.EXPECT().RouteNextRequest(mock.Anything, mock.Anything).Return("", ErrNoHealthyUpstreams) + routingStrategy.EXPECT().RouteNextRequest(mock.Anything, mock.Anything).Return("", DefaultNoHealthyUpstreamsError) router := NewRouter("mainnet", cacheConfig, upstreamConfigs, make([]config.GroupConfig, 0), metadata.NewChainMetadataStore(), managerMock, routingStrategy, metrics.NewContainer(config.TestChainName), zap.L(), nil) router.(*SimpleRouter).healthCheckManager = managerMock router.Start() - _, jsonResp, httpResp, err := router.Route(context.Background(), &jsonrpc.BatchRequestBody{}) - defer httpResp.Body.Close() + _, jsonResp, err := router.Route(context.Background(), &jsonrpc.BatchRequestBody{}) assert.Nil(t, jsonResp) - assert.Equal(t, 503, httpResp.StatusCode) - assert.Equal(t, "no healthy upstreams", readyBody(httpResp.Body)) - assert.Nil(t, err) -} - -func readyBody(body io.ReadCloser) string { - bodyBytes, _ := io.ReadAll(body) - return string(bodyBytes) + assert.Equal(t, DefaultNoHealthyUpstreamsError, err) } func TestRouter_GroupUpstreamsByPriority(t *testing.T) { @@ -116,8 +108,7 @@ func TestRouter_GroupUpstreamsByPriority(t *testing.T) { router.(*SimpleRouter).requestExecutor.httpClient = httpClientMock router.(*SimpleRouter).routingStrategy = routingStrategyMock - upstreamID, jsonRPCResp, httpResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) - defer httpResp.Body.Close() + upstreamID, jsonRPCResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) assert.Nil(t, err) assert.Equal(t, erigonConfig.ID, upstreamID) @@ -163,8 +154,7 @@ func TestGroupUpstreamsByPriority_NoGroups(t *testing.T) { router.(*SimpleRouter).requestExecutor.httpClient = httpClientMock router.(*SimpleRouter).routingStrategy = routingStrategyMock - upstreamID, jsonRPCResp, httpResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) - defer httpResp.Body.Close() + upstreamID, jsonRPCResp, err := router.Route(context.Background(), &jsonrpc.SingleRequestBody{Method: "my_method"}) assert.Nil(t, err) assert.Equal(t, erigonConfig.ID, upstreamID) diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index 2fd9a434..9a59c74c 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -1,8 +1,6 @@ package route import ( - "errors" - "sort" "sync/atomic" @@ -32,7 +30,15 @@ func NewPriorityRoundRobinStrategy(logger *zap.Logger) *PriorityRoundRobinStrate } } -var ErrNoHealthyUpstreams = errors.New("no healthy upstreams") +type NoHealthyUpstreamsError struct { + msg string +} + +var DefaultNoHealthyUpstreamsError = &NoHealthyUpstreamsError{"no healthy upstreams found"} + +func (e *NoHealthyUpstreamsError) Error() string { + return e.msg +} func (s *PriorityRoundRobinStrategy) RouteNextRequest( upstreamsByPriority types.PriorityToUpstreamsMap, @@ -53,5 +59,5 @@ func (s *PriorityRoundRobinStrategy) RouteNextRequest( s.logger.Debug("Did not find any healthy nodes in priority.", zap.Int("priority", priority)) } - return "", ErrNoHealthyUpstreams + return "", DefaultNoHealthyUpstreamsError } diff --git a/internal/route/routing_strategy_test.go b/internal/route/routing_strategy_test.go index d7aaa253..96b7025d 100644 --- a/internal/route/routing_strategy_test.go +++ b/internal/route/routing_strategy_test.go @@ -62,6 +62,6 @@ func TestPriorityStrategy_NoUpstreams(t *testing.T) { for i := 0; i < 10; i++ { upstreamID, err := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "", upstreamID) - assert.True(t, errors.Is(err, ErrNoHealthyUpstreams)) + assert.True(t, errors.Is(err, DefaultNoHealthyUpstreamsError)) } } diff --git a/internal/server/rpc_handler.go b/internal/server/rpc_handler.go index 1e3e9e90..75353aaf 100644 --- a/internal/server/rpc_handler.go +++ b/internal/server/rpc_handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "time" @@ -43,7 +44,16 @@ func (h *RPCHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { return } - requestBody, err := jsonrpc.DecodeRequestBody(req) + // No need to close the request body, the Server implementation will take care of it. + requestBodyRawBytes, err := io.ReadAll(req.Body) + + if err != nil { + errMsg := fmt.Sprintf("Request body could not be read, err: %s", err.Error()) + h.logger.Error(errMsg) + respondJSON(h.logger, writer, errMsg, http.StatusInternalServerError) + } + + requestBody, err := jsonrpc.DecodeRequestBody(requestBodyRawBytes) if err != nil { errMsg := fmt.Sprintf("Request body could not be parsed, err: %s", err.Error()) resp := jsonrpc.CreateErrorJSONRPCResponseBody(errMsg, jsonrpc.InternalServerErrorCode) @@ -55,26 +65,31 @@ func (h *RPCHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { h.logger.Debug("Request received.", zap.String("method", req.Method), zap.String("path", req.URL.Path), zap.String("query", req.URL.RawQuery), zap.Any("body", requestBody)) - upstreamID, respBody, resp, err := h.router.Route(ctx, requestBody) - if resp != nil { - defer resp.Body.Close() - } + upstreamID, jsonRPCRespBody, err := h.router.Route(ctx, requestBody) if err != nil { switch e := err.(type) { - case jsonrpc.DecodeError: + // Still pass the response to client if we're not able to decode response from upstream. + case *jsonrpc.DecodeError: respondRaw(nil, writer, e.Content, http.StatusOK) return + case *route.NoHealthyUpstreamsError: + respondJSON(h.logger, writer, "No healthy upstreams.", http.StatusServiceUnavailable) + return default: - resp := jsonrpc.CreateErrorJSONRPCResponseBodyWithRequest(fmt.Sprintf("Request could not be routed, err: %s", err.Error()), jsonrpc.InternalServerErrorCode, requestBody) - respondJSONRPC(h.logger, writer, resp, http.StatusInternalServerError) + statusCode := http.StatusInternalServerError + if originErr, ok := err.(*route.OriginError); ok && originErr.ResponseCode != 0 { + statusCode = originErr.ResponseCode + } + + respondJSON(h.logger, writer, fmt.Sprintf("Request could not be routed, err: %s", err.Error()), statusCode) return } } writer.Header().Set("X-Upstream-ID", upstreamID) - respondJSONRPC(h.logger, writer, respBody, resp.StatusCode) + respondJSONRPC(h.logger, writer, jsonRPCRespBody, http.StatusOK) h.logger.Debug("Request successfully routed.", zap.Any("requestBody", requestBody)) } diff --git a/internal/server/web_server_e2e_test.go b/internal/server/web_server_e2e_test.go index 18b81f4d..6b2e9e03 100644 --- a/internal/server/web_server_e2e_test.go +++ b/internal/server/web_server_e2e_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "math/big" "net/http" "net/http/httptest" @@ -332,9 +333,11 @@ func executeRequest( handler.ServeHTTP(recorder, req) result := recorder.Result() + resultBody, _ := io.ReadAll(result.Body) + defer result.Body.Close() - responseBody, err := jsonrpc.DecodeResponseBody(result) + responseBody, err := jsonrpc.DecodeResponseBody(resultBody) assert.NoError(t, err) require.NotNil(t, responseBody) @@ -369,7 +372,10 @@ func setUpHealthyUpstream( t.Helper() return httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - requestBody, err := jsonrpc.DecodeRequestBody(request) + requestBodyRawBytes, err := io.ReadAll(request.Body) + assert.NoError(t, err) + + requestBody, err := jsonrpc.DecodeRequestBody(requestBodyRawBytes) assert.NoError(t, err) var responseBody jsonrpc.ResponseBody @@ -434,7 +440,10 @@ func setUpUnhealthyUpstream(t *testing.T) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - requestBody, err := jsonrpc.DecodeRequestBody(request) + requestBodyRawBytes, err := io.ReadAll(request.Body) + assert.NoError(t, err) + + requestBody, err := jsonrpc.DecodeRequestBody(requestBodyRawBytes) assert.NoError(t, err) var responseBody jsonrpc.ResponseBody diff --git a/internal/server/web_server_test.go b/internal/server/web_server_test.go index 6f089204..6fa215a3 100644 --- a/internal/server/web_server_test.go +++ b/internal/server/web_server_test.go @@ -7,7 +7,6 @@ import ( "io" "net/http" "net/http/httptest" - "strings" "testing" "github.com/satsuma-data/node-gateway/internal/config" @@ -28,10 +27,7 @@ func TestHandleJSONRPCRequest_Success(t *testing.T) { } router.EXPECT().Route(mock.Anything, mock.Anything). Return("fakeUpstream", expectedRPCResponse, - &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(strings.NewReader("dummy")), - }, nil) + nil) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} @@ -44,9 +40,11 @@ func TestHandleJSONRPCRequest_Success(t *testing.T) { handler.ServeHTTP(recorder, req) result := recorder.Result() + resultBody, _ := io.ReadAll(result.Body) + defer result.Body.Close() - jsonRPCResponse, _ := jsonrpc.DecodeResponseBody(result) + jsonRPCResponse, _ := jsonrpc.DecodeResponseBody(resultBody) assert.Equal(t, result.StatusCode, http.StatusOK) assert.Equal(t, expectedRPCResponse, jsonRPCResponse) @@ -126,11 +124,7 @@ func TestHandleJSONRPCRequest_NilJSONRPCResponse(t *testing.T) { router := mocks.NewRouter(t) router.EXPECT().Route(mock.Anything, mock.Anything). - Return("", nil, - &http.Response{ - StatusCode: http.StatusAccepted, - Body: io.NopCloser(strings.NewReader("dummy")), - }, nil) + Return("", nil, nil) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} @@ -145,7 +139,7 @@ func TestHandleJSONRPCRequest_NilJSONRPCResponse(t *testing.T) { result := recorder.Result() defer result.Body.Close() - assert.Equal(t, http.StatusAccepted, result.StatusCode) + assert.Equal(t, http.StatusOK, result.StatusCode) body, _ := io.ReadAll(result.Body) assert.Empty(t, body) } @@ -155,7 +149,7 @@ func TestHandleJSONRPCRequest_JSONRPCDecodeError(t *testing.T) { undecodableContent := []byte("content") router.EXPECT().Route(mock.Anything, mock.Anything). - Return("", nil, nil, jsonrpc.DecodeError{Err: errors.New("error decoding"), Content: undecodableContent}) + Return("", nil, &jsonrpc.DecodeError{Err: errors.New("error decoding"), Content: undecodableContent}) handler := &RPCHandler{path: "/" + config.TestChainName, router: router, logger: zap.L()} diff --git a/internal/util/http.go b/internal/util/http.go new file mode 100644 index 00000000..3bb2f332 --- /dev/null +++ b/internal/util/http.go @@ -0,0 +1,20 @@ +package util + +import ( + "bytes" + "io" + "net/http" +) + +func ReadAndCopyBackResponseBody(resp *http.Response) ([]byte, error) { + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return respBody, err + } + + resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) + + return respBody, nil +} From 77cd3c36002a7afb3e6ab47cc30c428772e32cd5 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Tue, 19 Dec 2023 13:12:01 -0800 Subject: [PATCH 2/6] fix: remove depguard from linter --- .golangci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.golangci.yml b/.golangci.yml index 19bd5add..6a7c6e1e 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,7 +24,7 @@ linters: enable: - bodyclose - deadcode - - depguard + # - depguard - dogsled - dupl - errcheck From 1a7a578f0e64084f7c08d314447788d0f6c72c69 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Tue, 19 Dec 2023 13:12:47 -0800 Subject: [PATCH 3/6] fix: remove varcheck and deadcode linters since they're deprecated --- .golangci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 6a7c6e1e..c31399b2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -23,7 +23,6 @@ linters: disable-all: true enable: - bodyclose - - deadcode # - depguard - dogsled - dupl @@ -55,7 +54,6 @@ linters: - typecheck - unconvert - unparam - - varcheck - whitespace - wsl From b3d64808c3ead1b400180c4e77d7f6267e3821f8 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Tue, 19 Dec 2023 13:17:05 -0800 Subject: [PATCH 4/6] fix: satsify linter --- internal/metadata/store.go | 2 +- internal/route/node_filter.go | 8 ++++---- internal/server/web_server_e2e_test.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/metadata/store.go b/internal/metadata/store.go index 50b644e3..74f331db 100644 --- a/internal/metadata/store.go +++ b/internal/metadata/store.go @@ -75,7 +75,7 @@ func (c *ChainMetadataStore) ProcessBlockHeightUpdate(groupID, upstreamID string } } -func (c *ChainMetadataStore) ProcessErrorUpdate(groupID, upstreamID string, err error) { +func (c *ChainMetadataStore) ProcessErrorUpdate(_, upstreamID string, err error) { c.opChannel <- func() { c.updateErrorForUpstream(upstreamID, err) } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 37fcadce..53b52966 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -48,7 +48,7 @@ type HasEnoughPeers struct { minimumPeerCount uint64 } -func (f *HasEnoughPeers) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, numUpstreamsInPriorityGroup int) bool { +func (f *HasEnoughPeers) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) peerCheck, _ := upstreamStatus.PeerCheck.(*checks.PeerCheck) @@ -79,7 +79,7 @@ type IsDoneSyncing struct { logger *zap.Logger } -func (f *IsDoneSyncing) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, numUpstreamsInPriorityGroup int) bool { +func (f *IsDoneSyncing) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, _ int) bool { upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) isSyncingCheck, _ := upstreamStatus.SyncingCheck.(*checks.SyncingCheck) @@ -116,7 +116,7 @@ type IsCloseToGlobalMaxHeight struct { func (f *IsCloseToGlobalMaxHeight) Apply( _ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, - numUpstreamsInPriorityGroup int, + _ int, ) bool { status := f.chainMetadataStore.GetBlockHeightStatus(upstreamConfig.GroupID, upstreamConfig.ID) @@ -206,7 +206,7 @@ type AreMethodsAllowed struct { func (f *AreMethodsAllowed) Apply( requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig, - numUpstreamsInPriorityGroup int, + _ int, ) bool { for _, method := range requestMetadata.Methods { // Check methods that are have been disabled on the upstream. diff --git a/internal/server/web_server_e2e_test.go b/internal/server/web_server_e2e_test.go index 6b2e9e03..27bce948 100644 --- a/internal/server/web_server_e2e_test.go +++ b/internal/server/web_server_e2e_test.go @@ -118,7 +118,7 @@ func TestServeHTTP_ForwardsToCorrectNodeTypeBasedOnStatefulness(t *testing.T) { expectedBlockTxCount := 29 fullNodeUpstream := setUpHealthyUpstream(t, map[string]func(t *testing.T, request jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody{ - statefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { + statefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { //nolint:unparam // test method always returns err t.Helper() t.Errorf("Unexpected call to stateful method %s on a full node!", statefulMethod) @@ -146,7 +146,7 @@ func TestServeHTTP_ForwardsToCorrectNodeTypeBasedOnStatefulness(t *testing.T) { ID: *request.ID, } }, - nonStatefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { + nonStatefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { //nolint:unparam // test method always returns err t.Helper() t.Errorf("Unexpected call to method %s: archive node is at lower priority!", nonStatefulMethod) @@ -198,13 +198,13 @@ func TestServeHTTP_ForwardsToCorrectNodeTypeBasedOnStatefulnessBatch(t *testing. expectedBlockTxCount := 29 fullNodeUpstream := setUpHealthyUpstream(t, map[string]func(t *testing.T, request jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody{ - statefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { + statefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { //nolint:unparam // test method always returns err t.Helper() t.Errorf("Unexpected call to stateful method %s on a full node!", statefulMethod) return jsonrpc.SingleResponseBody{} }, - nonStatefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { + nonStatefulMethod: func(t *testing.T, _ jsonrpc.SingleRequestBody) jsonrpc.SingleResponseBody { //nolint:unparam // test method always returns err t.Helper() t.Errorf("Unexpected call to non-stateful method %s on a full node!", nonStatefulMethod) From 1da3476b95738efda91f6ae23b2261579b262573 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Tue, 19 Dec 2023 14:56:41 -0800 Subject: [PATCH 5/6] fix: small fixes and logging improvements --- internal/route/request_executor.go | 22 +++++++++++----------- internal/server/rpc_handler.go | 2 ++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/route/request_executor.go b/internal/route/request_executor.go index 4779a2ae..264ca878 100644 --- a/internal/route/request_executor.go +++ b/internal/route/request_executor.go @@ -45,7 +45,7 @@ type HTTPResponse struct { } func (e *OriginError) Error() string { - return fmt.Sprintf("error making request to origin: %v", e.err) + return fmt.Sprintf("error making request to origin. err: %v, resp: %s, respCode: %d", e.err, e.response, e.ResponseCode) } /* @@ -87,8 +87,8 @@ func (r *RequestExecutor) routeToConfig( } var ( - respBody jsonrpc.ResponseBody - resp *HTTPResponse + jsonRespBody jsonrpc.ResponseBody + httpResp *HTTPResponse ) singleRequestBody, isSingleRequestBody := requestBody.(*jsonrpc.SingleRequestBody) @@ -97,26 +97,26 @@ func (r *RequestExecutor) routeToConfig( var cached bool // In case of unknown caching errors, the httpReq might get used twice. // We must clone the httpReq otherwise the body will already be closed on the second request. - respBody, resp, cached, err = r.retrieveOrCacheRequest(cloneRequest(httpReq), *singleRequestBody, configToRoute) + jsonRespBody, httpResp, cached, err = r.retrieveOrCacheRequest(cloneRequest(httpReq), *singleRequestBody, configToRoute) if err != nil { switch e := err.(type) { case *OriginError, *jsonrpc.DecodeError: // These errors indicates a cache miss and request failure to origin. // We want this error to bubble up. // Unknown cache errors will default back to the "non-caching" behavior. - r.logger.Warn("caching error making request to origin", zap.Error(err), zap.Any("request", requestBody)) - return nil, nil, cached, e + r.logger.Warn("caching error making request to origin", zap.Error(err), zap.Any("request", requestBody), zap.Any("resp", httpResp)) + return nil, httpResp, cached, e default: - r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody)) + r.logger.Warn("unknown caching error", zap.Error(err), zap.Any("request", requestBody), zap.Any("resp", httpResp)) } } else { - return respBody, resp, cached, nil + return jsonRespBody, httpResp, cached, nil } } - respBody, resp, err = r.getResponseBody(httpReq, requestBody, configToRoute) + jsonRespBody, httpResp, err = r.getResponseBody(httpReq, requestBody, configToRoute) - return respBody, resp, false, err + return jsonRespBody, httpResp, false, err } func (r *RequestExecutor) useCache(requestBody jsonrpc.RequestBody) bool { @@ -227,7 +227,7 @@ func (r *RequestExecutor) getResponseBody(httpReq *http.Request, requestBody jso } if resp.StatusCode >= http.StatusBadRequest { - r.logger.Error("Non-2xx/3xx status code encountered when executing request.", zap.Any("request", requestBody), + r.logger.Error("4xx/5xx status code encountered when executing request.", zap.Any("request", requestBody), zap.String("upstreamID", configToRoute.ID), zap.String("response", respBodyString), zap.Int("httpStatusCode", resp.StatusCode), zap.Error(err)) diff --git a/internal/server/rpc_handler.go b/internal/server/rpc_handler.go index 75353aaf..a2382f74 100644 --- a/internal/server/rpc_handler.go +++ b/internal/server/rpc_handler.go @@ -51,6 +51,8 @@ func (h *RPCHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { errMsg := fmt.Sprintf("Request body could not be read, err: %s", err.Error()) h.logger.Error(errMsg) respondJSON(h.logger, writer, errMsg, http.StatusInternalServerError) + + return } requestBody, err := jsonrpc.DecodeRequestBody(requestBodyRawBytes) From c791c0efedd0a6d6b4a7a354597c0affd37f8b97 Mon Sep 17 00:00:00 2001 From: Brian Luong Date: Tue, 19 Dec 2023 15:03:41 -0800 Subject: [PATCH 6/6] fix: bump version of golangci-lint in ci --- .github/workflows/golangci-lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 485e6e47..a519f92e 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -22,5 +22,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.48.0 + version: v1.55.2 args: ./...