diff --git a/Makefile b/Makefile index 32feb2a5184..bb35ec896dd 100644 --- a/Makefile +++ b/Makefile @@ -205,6 +205,7 @@ generate-mocks: install-mock-generators mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock" mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock" mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock" + mockery --name 'WebsocketConnection' --dir="./engine/access/rest/websockets" --case=underscore --output="./engine/access/rest/websockets/mock" --outpkg="mock" mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock" mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock" mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock" diff --git a/engine/access/rest/websockets/config.go b/engine/access/rest/websockets/config.go index 7f563ba94b9..8236913dd5f 100644 --- a/engine/access/rest/websockets/config.go +++ b/engine/access/rest/websockets/config.go @@ -4,6 +4,33 @@ import ( "time" ) +const ( + // PingPeriod defines the interval at which ping messages are sent to the client. + // This value must be less than pongWait, cause it that case the server ensures it sends a ping well before the PongWait + // timeout elapses. Each new pong message resets the server's read deadline, keeping the connection alive as long as + // the client is responsive. + // + // Example: + // At t=9, the server sends a ping, initial read deadline is t=10 (for the first message) + // At t=10, the client responds with a pong. The server resets its read deadline to t=20. + // At t=18, the server sends another ping. If the client responds with a pong at t=19, the read deadline is extended to t=29. + // + // In case of failure: + // If the client stops responding, the server will send a ping at t=9 but won't receive a pong by t=10. The server then closes the connection. + PingPeriod = (PongWait * 9) / 10 + + // PongWait specifies the maximum time to wait for a pong response message from the peer + // after sending a ping + PongWait = 10 * time.Second + + // WriteWait specifies a timeout for the write operation. If the write + // isn't completed within this duration, it fails with a timeout error. + // SetWriteDeadline ensures the write operation does not block indefinitely + // if the client is slow or unresponsive. This prevents resource exhaustion + // and allows the server to gracefully handle timeouts for delayed writes. + WriteWait = 10 * time.Second +) + type Config struct { MaxSubscriptionsPerConnection uint64 MaxResponsesPerSecond uint64 diff --git a/engine/access/rest/websockets/connection.go b/engine/access/rest/websockets/connection.go new file mode 100644 index 00000000000..5170e917e9f --- /dev/null +++ b/engine/access/rest/websockets/connection.go @@ -0,0 +1,57 @@ +package websockets + +import ( + "time" + + "github.com/gorilla/websocket" +) + +type WebsocketConnection interface { + ReadJSON(v interface{}) error + WriteJSON(v interface{}) error + WriteControl(messageType int, deadline time.Time) error + Close() error + SetReadDeadline(deadline time.Time) error + SetWriteDeadline(deadline time.Time) error + SetPongHandler(h func(string) error) +} + +type WebsocketConnectionImpl struct { + conn *websocket.Conn +} + +func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnectionImpl { + return &WebsocketConnectionImpl{ + conn: conn, + } +} + +var _ WebsocketConnection = (*WebsocketConnectionImpl)(nil) + +func (c *WebsocketConnectionImpl) ReadJSON(v interface{}) error { + return c.conn.ReadJSON(v) +} + +func (c *WebsocketConnectionImpl) WriteJSON(v interface{}) error { + return c.conn.WriteJSON(v) +} + +func (c *WebsocketConnectionImpl) WriteControl(messageType int, deadline time.Time) error { + return c.conn.WriteControl(messageType, nil, deadline) +} + +func (c *WebsocketConnectionImpl) Close() error { + return c.conn.Close() +} + +func (c *WebsocketConnectionImpl) SetReadDeadline(deadline time.Time) error { + return c.conn.SetReadDeadline(deadline) +} + +func (c *WebsocketConnectionImpl) SetWriteDeadline(deadline time.Time) error { + return c.conn.SetWriteDeadline(deadline) +} + +func (c *WebsocketConnectionImpl) SetPongHandler(h func(string) error) { + c.conn.SetPongHandler(h) +} diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index 38bc7306b55..642a454271c 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -4,10 +4,12 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers" "github.com/onflow/flow-go/engine/access/rest/websockets/models" @@ -15,18 +17,20 @@ import ( ) type Controller struct { - logger zerolog.Logger - config Config - conn *websocket.Conn - communicationChannel chan interface{} - dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider] - dataProviderFactory dp.DataProviderFactory + logger zerolog.Logger + config Config + conn WebsocketConnection + + communicationChannel chan interface{} // Channel for sending messages to the client. + + dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider] + dataProviderFactory dp.DataProviderFactory } func NewWebSocketController( logger zerolog.Logger, config Config, - conn *websocket.Conn, + conn WebsocketConnection, dataProviderFactory dp.DataProviderFactory, ) *Controller { return &Controller{ @@ -39,29 +43,101 @@ func NewWebSocketController( } } -// HandleConnection manages the WebSocket connection, adding context and error handling. +// HandleConnection manages the lifecycle of a WebSocket connection, +// including setup, message processing, and graceful shutdown. +// +// Parameters: +// - ctx: The context for controlling cancellation and timeouts. func (c *Controller) HandleConnection(ctx context.Context) { - //TODO: configure the connection with ping-pong and deadlines + defer c.shutdownConnection() + + // configuring the connection with appropriate read/write deadlines and handlers. + err := c.configureKeepalive() + if err != nil { + // TODO: add error handling here + c.logger.Error().Err(err).Msg("error configuring keepalive connection") + + return + } + //TODO: spin up a response limit tracker routine - go c.readMessagesFromClient(ctx) - c.writeMessagesToClient(ctx) + + // for track all goroutines and error handling + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + return c.readMessagesFromClient(gCtx) + }) + + g.Go(func() error { + return c.keepalive(gCtx) + }) + + g.Go(func() error { + return c.writeMessagesToClient(gCtx) + }) + + if err = g.Wait(); err != nil { + //TODO: add error handling here + c.logger.Error().Err(err).Msg("error detected in one of the goroutines") + } +} + +// configureKeepalive sets up the WebSocket connection with a read deadline +// and a handler for receiving pong messages from the client. +// +// The function does the following: +// 1. Sets an initial read deadline to ensure the server doesn't wait indefinitely +// for a pong message from the client. If no message is received within the +// specified `pongWait` duration, the connection will be closed. +// 2. Establishes a Pong handler that resets the read deadline every time a pong +// message is received from the client, allowing the server to continue waiting +// for further pong messages within the new deadline. +// +// No errors are expected during normal operation. +func (c *Controller) configureKeepalive() error { + // Set the initial read deadline for the first pong message + // The Pong handler itself only resets the read deadline after receiving a Pong. + // It doesn't set an initial deadline. The initial read deadline is crucial to prevent the server from waiting + // forever if the client doesn't send Pongs. + if err := c.conn.SetReadDeadline(time.Now().Add(PongWait)); err != nil { + return fmt.Errorf("failed to set the initial read deadline: %w", err) + } + // Establish a Pong handler which sets the handler for pong messages received from the peer. + c.conn.SetPongHandler(func(string) error { + return c.conn.SetReadDeadline(time.Now().Add(PongWait)) + }) + + return nil } // writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection. // The communication channel is filled by data providers. Besides, the response limit tracker is involved in // write message regulation -func (c *Controller) writeMessagesToClient(ctx context.Context) { - //TODO: can it run forever? maybe we should cancel the ctx in the reader routine +// +// No errors are expected during normal operation. All errors are considered benign. +func (c *Controller) writeMessagesToClient(ctx context.Context) error { for { select { case <-ctx.Done(): - return - case msg := <-c.communicationChannel: + return nil + case msg, ok := <-c.communicationChannel: + if !ok { + return fmt.Errorf("communication channel closed, no error occurred") + } // TODO: handle 'response per second' limits + // Specifies a timeout for the write operation. If the write + // isn't completed within this duration, it fails with a timeout error. + // SetWriteDeadline ensures the write operation does not block indefinitely + // if the client is slow or unresponsive. This prevents resource exhaustion + // and allows the server to gracefully handle timeouts for delayed writes. + if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil { + return fmt.Errorf("failed to set the write deadline: %w", err) + } err := c.conn.WriteJSON(msg) if err != nil { - c.logger.Error().Err(err).Msg("error writing to connection") + return fmt.Errorf("failed to write message to connection: %w", err) } } } @@ -69,32 +145,29 @@ func (c *Controller) writeMessagesToClient(ctx context.Context) { // readMessagesFromClient continuously reads messages from a client WebSocket connection, // processes each message, and handles actions based on the message type. -func (c *Controller) readMessagesFromClient(ctx context.Context) { - defer c.shutdownConnection() - +// +// No errors are expected during normal operation. All errors are considered benign. +func (c *Controller) readMessagesFromClient(ctx context.Context) error { for { select { case <-ctx.Done(): - c.logger.Info().Msg("context canceled, stopping read message loop") - return + return nil default: msg, err := c.readMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) { - return + return nil } - c.logger.Warn().Err(err).Msg("error reading message from client") - return + return fmt.Errorf("failed to read message from client: %w", err) } - baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg) + _, validatedMsg, err := c.parseAndValidateMessage(msg) if err != nil { - c.logger.Debug().Err(err).Msg("error parsing and validating client message") - return + return fmt.Errorf("failed to parse and validate client message: %w", err) } if err := c.handleAction(ctx, validatedMsg); err != nil { - c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action") + return fmt.Errorf("failed to handle message action: %w", err) } } } @@ -139,7 +212,6 @@ func (c *Controller) parseAndValidateMessage(message json.RawMessage) (models.Ba validatedMsg = listMsg default: - c.logger.Debug().Str("action", baseMsg.Action).Msg("unknown action type") return baseMsg, nil, fmt.Errorf("unknown action type: %s", baseMsg.Action) } @@ -202,12 +274,12 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis } func (c *Controller) shutdownConnection() { - defer close(c.communicationChannel) - defer func(conn *websocket.Conn) { + defer func() { if err := c.conn.Close(); err != nil { c.logger.Error().Err(err).Msg("error closing connection") } - }(c.conn) + // TODO: safe closing communicationChannel will be included as a part of PR #6642 + }() err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error { dp.Close() @@ -219,3 +291,24 @@ func (c *Controller) shutdownConnection() { c.dataProviders.Clear() } + +// keepalive sends a ping message periodically to keep the WebSocket connection alive +// and avoid timeouts. +// +// No errors are expected during normal operation. All errors are considered benign. +func (c *Controller) keepalive(ctx context.Context) error { + pingTicker := time.NewTicker(PingPeriod) + defer pingTicker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-pingTicker.C: + err := c.conn.WriteControl(websocket.PingMessage, time.Now().Add(WriteWait)) + if err != nil { + return fmt.Errorf("failed to write ping message: %w", err) + } + } + } +} diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go new file mode 100644 index 00000000000..afc14d9823f --- /dev/null +++ b/engine/access/rest/websockets/controller_test.go @@ -0,0 +1,275 @@ +package websockets + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers" + dpmock "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/mock" + connectionmock "github.com/onflow/flow-go/engine/access/rest/websockets/mock" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/utils/unittest" +) + +// ControllerSuite is a test suite for the WebSocket Controller. +type ControllerSuite struct { + suite.Suite + + logger zerolog.Logger + config Config + + connection *connectionmock.WebsocketConnection + dataProviderFactory *dpmock.DataProviderFactory +} + +func TestControllerSuite(t *testing.T) { + suite.Run(t, new(ControllerSuite)) +} + +// SetupTest initializes the test suite with required dependencies. +func (s *ControllerSuite) SetupTest() { + s.logger = unittest.Logger() + s.config = Config{} + + s.connection = connectionmock.NewWebsocketConnection(s.T()) + s.dataProviderFactory = dpmock.NewDataProviderFactory(s.T()) +} + +// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly. +func (s *ControllerSuite) TestConfigureKeepaliveConnection() { + controller := s.initializeController() + + // Mock configureConnection to succeed + s.mockConnectionSetup() + + // Call configureKeepalive and check for errors + err := controller.configureKeepalive() + s.Require().NoError(err, "configureKeepalive should not return an error") + + // Assert expectations + s.connection.AssertExpectations(s.T()) +} + +// TestControllerShutdown ensures that HandleConnection shuts down gracefully when an error occurs. +func (s *ControllerSuite) TestControllerShutdown() { + s.T().Run("keepalive routine failed", func(*testing.T) { + controller := s.initializeController() + + // Mock configureConnection to succeed + s.mockConnectionSetup() + + // Mock keepalive to return an error + done := make(chan struct{}, 1) + s.connection.On("WriteControl", websocket.PingMessage, mock.Anything).Return(func(int, time.Time) error { + close(done) + return websocket.ErrCloseSent + }).Once() + + s.connection. + On("ReadJSON", mock.Anything). + Return(func(interface{}) error { + <-done + return websocket.ErrCloseSent + }). + Once() + + s.connection.On("Close").Return(nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controller.HandleConnection(ctx) + + // Ensure all expectations are met + s.connection.AssertExpectations(s.T()) + }) + + s.T().Run("read routine failed", func(*testing.T) { + controller := s.initializeController() + // Mock configureConnection to succeed + s.mockConnectionSetup() + + s.connection. + On("ReadJSON", mock.Anything). + Return(func(_ interface{}) error { + return assert.AnError + }). + Once() + + s.connection.On("Close").Return(nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controller.HandleConnection(ctx) + + // Ensure all expectations are met + s.connection.AssertExpectations(s.T()) + }) + + s.T().Run("write routine failed", func(*testing.T) { + controller := s.initializeController() + + // Mock configureConnection to succeed + s.mockConnectionSetup() + blocksDataProvider := s.mockBlockDataProviderSetup(uuid.New()) + + done := make(chan struct{}, 1) + requestMessage := models.SubscribeMessageRequest{ + BaseMessageRequest: models.BaseMessageRequest{Action: "subscribe"}, + Topic: dp.BlocksTopic, + Arguments: nil, + } + msg, err := json.Marshal(requestMessage) + s.Require().NoError(err) + + // Mocks `ReadJSON(v interface{}) error` which accepts an uninitialize interface that + // receives the contents of the read message. This logic mocks that behavior, setting + // the target with the value `msg` + s.connection. + On("ReadJSON", mock.Anything). + Run(func(args mock.Arguments) { + reqMsg, ok := args.Get(0).(*json.RawMessage) + s.Require().True(ok) + *reqMsg = msg + }). + Return(nil). + Once() + + s.connection. + On("ReadJSON", mock.Anything). + Return(func(interface{}) error { + <-done + return websocket.ErrCloseSent + }) + + s.connection.On("SetWriteDeadline", mock.Anything).Return(nil).Once() + s.connection. + On("WriteJSON", mock.Anything). + Return(func(msg interface{}) error { + close(done) + return assert.AnError + }) + s.connection.On("Close").Return(nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + controller.HandleConnection(ctx) + + // Ensure all expectations are met + s.connection.AssertExpectations(s.T()) + s.dataProviderFactory.AssertExpectations(s.T()) + blocksDataProvider.AssertExpectations(s.T()) + }) + + s.T().Run("context closed", func(*testing.T) { + controller := s.initializeController() + + // Mock configureConnection to succeed + s.mockConnectionSetup() + + s.connection.On("Close").Return(nil).Once() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + controller.HandleConnection(ctx) + + // Ensure all expectations are met + s.connection.AssertExpectations(s.T()) + }) +} + +// TestKeepaliveHappyCase tests the behavior of the keepalive function. +func (s *ControllerSuite) TestKeepaliveHappyCase() { + // Create a context for the test + ctx := context.Background() + + controller := s.initializeController() + s.connection.On("WriteControl", websocket.PingMessage, mock.Anything).Return(nil) + + // Start the keepalive process in a separate goroutine + go func() { + err := controller.keepalive(ctx) + s.Require().NoError(err) + }() + + // Use Eventually to wait for some ping messages + expectedCalls := 3 // expected 3 ping messages for 30 seconds + s.Require().Eventually(func() bool { + return len(s.connection.Calls) == expectedCalls + }, time.Duration(expectedCalls)*PongWait, 1*time.Second, "not all ping messages were sent") + + s.connection.On("Close").Return(nil).Once() + controller.shutdownConnection() + + // Assert that the ping was sent + s.connection.AssertExpectations(s.T()) +} + +// TestKeepaliveError tests the behavior of the keepalive function when there is an error in writing the ping. +func (s *ControllerSuite) TestKeepaliveError() { + controller := s.initializeController() + + // Setup the mock connection with an error + s.connection.On("WriteControl", websocket.PingMessage, mock.Anything).Return(assert.AnError).Once() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + expectedError := fmt.Errorf("failed to write ping message: %w", assert.AnError) + // Start the keepalive process + err := controller.keepalive(ctx) + s.Require().Error(err) + s.Require().Equal(expectedError, err) + + // Assert expectations + s.connection.AssertExpectations(s.T()) +} + +// TestKeepaliveContextCancel tests the behavior of keepalive when the context is canceled before a ping is sent and +// no ping message is sent after the context is canceled. +func (s *ControllerSuite) TestKeepaliveContextCancel() { + controller := s.initializeController() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Immediately cancel the context + + // Start the keepalive process with the context canceled + err := controller.keepalive(ctx) + s.Require().NoError(err) + + // Assert expectations + s.connection.AssertExpectations(s.T()) // Should not invoke WriteMessage after context cancellation +} + +// initializeController initializes the WebSocket controller. +func (s *ControllerSuite) initializeController() *Controller { + return NewWebSocketController(s.logger, s.config, s.connection, s.dataProviderFactory) +} + +// mockDataProviderSetup is a helper which mocks a blocks data provider setup. +func (s *ControllerSuite) mockBlockDataProviderSetup(id uuid.UUID) *dpmock.DataProvider { + dataProvider := dpmock.NewDataProvider(s.T()) + dataProvider.On("ID").Return(id).Once() + dataProvider.On("Close").Return(nil).Once() + s.dataProviderFactory.On("NewDataProvider", mock.Anything, dp.BlocksTopic, mock.Anything, mock.Anything). + Return(dataProvider, nil).Once() + dataProvider.On("Run").Return(nil).Once() + + return dataProvider +} + +// mockConnectionSetup is a helper which mocks connection setup for SetReadDeadline and SetPongHandler. +func (s *ControllerSuite) mockConnectionSetup() { + s.connection.On("SetReadDeadline", mock.Anything).Return(nil).Once() + s.connection.On("SetPongHandler", mock.AnythingOfType("func(string) error")).Return(nil).Once() +} diff --git a/engine/access/rest/websockets/handler.go b/engine/access/rest/websockets/handler.go index c93548d5f9e..8dbe13078ad 100644 --- a/engine/access/rest/websockets/handler.go +++ b/engine/access/rest/websockets/handler.go @@ -61,6 +61,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - controller := NewWebSocketController(logger, h.websocketConfig, conn, h.dataProviderFactory) + controller := NewWebSocketController(logger, h.websocketConfig, NewWebsocketConnection(conn), h.dataProviderFactory) controller.HandleConnection(context.TODO()) } diff --git a/engine/access/rest/websockets/legacy/websocket_handler.go b/engine/access/rest/websockets/legacy/websocket_handler.go index 7132314b16c..06aa8323de4 100644 --- a/engine/access/rest/websockets/legacy/websocket_handler.go +++ b/engine/access/rest/websockets/legacy/websocket_handler.go @@ -12,6 +12,7 @@ import ( "go.uber.org/atomic" "github.com/onflow/flow-go/engine/access/rest/common" + "github.com/onflow/flow-go/engine/access/rest/websockets" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/engine/access/subscription" @@ -19,17 +20,6 @@ import ( "github.com/onflow/flow-go/model/flow" ) -const ( - // Time allowed to read the next pong message from the peer. - pongWait = 10 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second -) - // WebsocketController holds the necessary components and parameters for handling a WebSocket subscription. // It manages the communication between the server and the WebSocket client for subscribing. type WebsocketController struct { @@ -47,17 +37,17 @@ type WebsocketController struct { // manage incoming Pong messages. These methods allow to specify a time limit for reading from or writing to a WebSocket // connection. If the operation (reading or writing) takes longer than the specified deadline, the connection will be closed. func (wsController *WebsocketController) SetWebsocketConf() error { - err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait)) // Set the initial write deadline for the first ping message + err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait)) // Set the initial write deadline for the first ping message if err != nil { return common.NewRestError(http.StatusInternalServerError, "Set the initial write deadline error: ", err) } - err = wsController.conn.SetReadDeadline(time.Now().Add(pongWait)) // Set the initial read deadline for the first pong message + err = wsController.conn.SetReadDeadline(time.Now().Add(websockets.PongWait)) // Set the initial read deadline for the first pong message if err != nil { return common.NewRestError(http.StatusInternalServerError, "Set the initial read deadline error: ", err) } // Establish a Pong handler wsController.conn.SetPongHandler(func(string) error { - err := wsController.conn.SetReadDeadline(time.Now().Add(pongWait)) + err := wsController.conn.SetReadDeadline(time.Now().Add(websockets.PongWait)) if err != nil { return err } @@ -111,7 +101,7 @@ func (wsController *WebsocketController) wsErrorHandler(err error) { // If an error occurs or the subscription channel is closed, it handles the error or termination accordingly. // The function uses a ticker to periodically send ping messages to the client to maintain the connection. func (wsController *WebsocketController) writeEvents(sub subscription.Subscription) { - ticker := time.NewTicker(pingPeriod) + ticker := time.NewTicker(websockets.PingPeriod) defer ticker.Stop() blocksSinceLastMessage := uint64(0) @@ -137,7 +127,7 @@ func (wsController *WebsocketController) writeEvents(sub subscription.Subscripti wsController.wsErrorHandler(common.NewRestError(http.StatusRequestTimeout, "subscription channel closed", err)) return } - err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait)) if err != nil { wsController.wsErrorHandler(common.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) return @@ -178,7 +168,7 @@ func (wsController *WebsocketController) writeEvents(sub subscription.Subscripti return } case <-ticker.C: - err := wsController.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := wsController.conn.SetWriteDeadline(time.Now().Add(websockets.WriteWait)) if err != nil { wsController.wsErrorHandler(common.NewRestError(http.StatusInternalServerError, "failed to set the initial write deadline: ", err)) return diff --git a/engine/access/rest/websockets/mock/websocket_connection.go b/engine/access/rest/websockets/mock/websocket_connection.go new file mode 100644 index 00000000000..02a60fd0a3c --- /dev/null +++ b/engine/access/rest/websockets/mock/websocket_connection.go @@ -0,0 +1,141 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// WebsocketConnection is an autogenerated mock type for the WebsocketConnection type +type WebsocketConnection struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *WebsocketConnection) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ReadJSON provides a mock function with given fields: v +func (_m *WebsocketConnection) ReadJSON(v interface{}) error { + ret := _m.Called(v) + + if len(ret) == 0 { + panic("no return value specified for ReadJSON") + } + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(v) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetPongHandler provides a mock function with given fields: h +func (_m *WebsocketConnection) SetPongHandler(h func(string) error) { + _m.Called(h) +} + +// SetReadDeadline provides a mock function with given fields: deadline +func (_m *WebsocketConnection) SetReadDeadline(deadline time.Time) error { + ret := _m.Called(deadline) + + if len(ret) == 0 { + panic("no return value specified for SetReadDeadline") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(deadline) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetWriteDeadline provides a mock function with given fields: deadline +func (_m *WebsocketConnection) SetWriteDeadline(deadline time.Time) error { + ret := _m.Called(deadline) + + if len(ret) == 0 { + panic("no return value specified for SetWriteDeadline") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(deadline) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// WriteControl provides a mock function with given fields: messageType, deadline +func (_m *WebsocketConnection) WriteControl(messageType int, deadline time.Time) error { + ret := _m.Called(messageType, deadline) + + if len(ret) == 0 { + panic("no return value specified for WriteControl") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, time.Time) error); ok { + r0 = rf(messageType, deadline) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// WriteJSON provides a mock function with given fields: v +func (_m *WebsocketConnection) WriteJSON(v interface{}) error { + ret := _m.Called(v) + + if len(ret) == 0 { + panic("no return value specified for WriteJSON") + } + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(v) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewWebsocketConnection creates a new instance of WebsocketConnection. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWebsocketConnection(t interface { + mock.TestingT + Cleanup(func()) +}) *WebsocketConnection { + mock := &WebsocketConnection{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}