Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement inactivity tracker #6817

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
40f0c2c
Implement limit tracker
Guitarheroua Dec 4, 2024
2534eaf
Added inactivity timeout floag for access and observer nodes
UlyanaAndrukhiv Dec 12, 2024
09627f6
Merged with krok/new-websockets
UlyanaAndrukhiv Dec 12, 2024
2f2bc98
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
AndriiDiachuk Dec 13, 2024
7179cb8
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into krok/new…
AndriiDiachuk Dec 13, 2024
8fd7faf
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
UlyanaAndrukhiv Dec 13, 2024
279ee14
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 13, 2024
d6fe8f5
Added inactivity tracker impl
UlyanaAndrukhiv Dec 13, 2024
d86d90e
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
UlyanaAndrukhiv Dec 13, 2024
341195c
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
Guitarheroua Dec 13, 2024
364b35e
added comments and error message
Guitarheroua Dec 13, 2024
09f1705
remove comments
illia-malachyn Dec 13, 2024
f1849c2
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
16f5734
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
8d6649d
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
6bb1143
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
0b20427
Merged with illia-malachyn/6642-ws-controller-error-handling
UlyanaAndrukhiv Dec 16, 2024
eaa649f
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' of gi…
UlyanaAndrukhiv Dec 16, 2024
bbafe48
Added init tests for inactivity tracker
UlyanaAndrukhiv Dec 16, 2024
5fa4e47
Merge branch 'AndriiSlisarchuk/6640-responce-limit-tracker' into krok…
Guitarheroua Dec 17, 2024
76ba6b2
Fixed test
UlyanaAndrukhiv Dec 17, 2024
53e3461
Merged with krok/new-websockets
UlyanaAndrukhiv Dec 17, 2024
bc38455
Merged with master
UlyanaAndrukhiv Dec 18, 2024
25a3b5c
Updated inactivity tracker for websockets according to comments
UlyanaAndrukhiv Dec 19, 2024
d388d79
Merged with illia-malachyn/6642-ws-controller-error-handling
UlyanaAndrukhiv Dec 19, 2024
72f1e1b
Updated according to comments
UlyanaAndrukhiv Dec 19, 2024
fc086df
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Dec 23, 2024
9a91eb1
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlianaAn…
UlyanaAndrukhiv Dec 26, 2024
20c3e06
Updated acccording to comments
UlyanaAndrukhiv Dec 26, 2024
b808c0c
Merged with master
UlyanaAndrukhiv Dec 30, 2024
bbc23ea
Updated comment according to suggestion, updated log message
UlyanaAndrukhiv Dec 30, 2024
4204718
Added missed godoc for websocket DefaultInactivityTimeout, removed ra…
UlyanaAndrukhiv Dec 30, 2024
807e589
Updated comments and log according to suggestions
UlyanaAndrukhiv Jan 2, 2025
946e02d
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Jan 2, 2025
a2b56c2
Removed comment and improved the readability of the code
UlyanaAndrukhiv Jan 2, 2025
1c4f2a7
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/onflow/flow-go/engine/access/rest"
commonrest "github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
Expand Down Expand Up @@ -227,8 +228,9 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
IdleTimeout: rest.DefaultIdleTimeout,
MaxRequestSize: commonrest.DefaultMaxRequestSize,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
},
stateStreamConf: statestreambackend.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -1450,6 +1452,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
12 changes: 8 additions & 4 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ type ObserverServiceConfig struct {
registerCacheSize uint
programCacheSize uint
registerDBPruneThreshold uint64
websocketConfig websockets.Config
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -200,8 +199,9 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
IdleTimeout: rest.DefaultIdleTimeout,
MaxRequestSize: commonrest.DefaultMaxRequestSize,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
CompressorName: grpcutils.NoCompressor,
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
},
stateStreamConf: statestreambackend.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -254,7 +254,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
registerCacheSize: 0,
programCacheSize: 0,
registerDBPruneThreshold: pruner.DefaultThreshold,
websocketConfig: websockets.NewDefaultWebsocketConfig(),
}
}

Expand Down Expand Up @@ -814,6 +813,11 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down
8 changes: 8 additions & 0 deletions engine/access/rest/websockets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,24 @@ const (
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
WriteWait = 10 * time.Second

// DefaultInactivityTimeout is the default duration a WebSocket connection can remain open without any active subscriptions
// before being automatically closed
DefaultInactivityTimeout time.Duration = 1 * time.Minute
)

type Config struct {
MaxSubscriptionsPerConnection uint64
MaxResponsesPerSecond uint64
// InactivityTimeout specifies the duration a WebSocket connection can remain open without any active subscriptions
// before being automatically closed
InactivityTimeout time.Duration
}

func NewDefaultWebsocketConfig() Config {
return Config{
MaxSubscriptionsPerConnection: 1000,
MaxResponsesPerSecond: 1000,
InactivityTimeout: DefaultInactivityTimeout,
}
}
29 changes: 27 additions & 2 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *Controller) HandleConnection(ctx context.Context) {

err := c.configureKeepalive()
if err != nil {
c.logger.Error().Err(err).Msg("error configuring connection")
c.logger.Error().Err(err).Msg("error configuring keepalive connection")
return
}

Expand Down Expand Up @@ -237,8 +237,16 @@ func (c *Controller) keepalive(ctx context.Context) error {
}

// writeMessages reads a messages from multiplexed stream and passes them on to a client WebSocket connection.
// The multiplexed stream channel is filled by data providers
// The multiplexed stream channel is filled by data providers.
// The function tracks the last message sent and periodically checks for inactivity.
// If no messages are sent within InactivityTimeout and no active data providers exist,
// the connection will be closed.
func (c *Controller) writeMessages(ctx context.Context) error {
inactivityTicker := time.NewTicker(c.config.InactivityTimeout / 10)
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
defer inactivityTicker.Stop()

lastMessageSentAt := time.Now()

defer func() {
// drain the channel as some providers may still send data to it after this routine shutdowns
// so, in order to not run into deadlock there should be at least 1 reader on the channel
Expand All @@ -257,13 +265,30 @@ func (c *Controller) writeMessages(ctx context.Context) error {
return nil
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
return fmt.Errorf("failed to set the write deadline: %w", err)
}

if err := c.conn.WriteJSON(message); err != nil {
return err
}

lastMessageSentAt = time.Now()

case <-inactivityTicker.C:
hasNoActiveSubscriptions := c.dataProviders.Size() == 0
exceedsInactivityTimeout := time.Since(lastMessageSentAt) > c.config.InactivityTimeout
if hasNoActiveSubscriptions && exceedsInactivityTimeout {
c.logger.Debug().
Dur("timeout", c.config.InactivityTimeout).
Msg("connection inactive, closing due to timeout")
return fmt.Errorf("no recent activity for %v", c.config.InactivityTimeout)
}
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
Expand Down Expand Up @@ -809,6 +808,37 @@ func (s *WsControllerSuite) TestControllerShutdown() {

conn.AssertExpectations(t)
})

s.T().Run("Inactivity tracking", func(t *testing.T) {
peterargue marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

conn := connmock.NewWebsocketConnection(t)
conn.On("Close").Return(nil).Once()
conn.On("SetReadDeadline", mock.Anything).Return(nil).Once()
conn.On("SetPongHandler", mock.AnythingOfType("func(string) error")).Return(nil).Once()

factory := dpmock.NewDataProviderFactory(t)
// Mock with short inactivity timeout for testing
wsConfig := s.wsConfig

wsConfig.InactivityTimeout = 50 * time.Millisecond
controller := NewWebSocketController(s.logger, wsConfig, conn, factory)

conn.
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
On("ReadJSON", mock.Anything).
Return(func(interface{}) error {
// waiting more than InactivityTimeout to make sure that read message routine busy and do not return
// an error before than inactivity tracker initiate shut down
<-time.After(wsConfig.InactivityTimeout)
return websocket.ErrCloseSent
}).
Once()

controller.HandleConnection(context.Background())
time.Sleep(wsConfig.InactivityTimeout)

conn.AssertExpectations(t)
})
}

func (s *WsControllerSuite) TestKeepaliveRoutine() {
Expand Down
Loading