Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Response limit tracker #6814

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
"sync"
"time"

"golang.org/x/time/rate"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -130,6 +132,7 @@
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
dataProvidersGroup *sync.WaitGroup
limiter *rate.Limiter
}

func NewWebSocketController(
Expand All @@ -146,6 +149,7 @@
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProviderFactory: dataProviderFactory,
dataProvidersGroup: &sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.MaxResponsesPerSecond), 1),
}
}

Expand Down Expand Up @@ -264,6 +268,10 @@
if !ok {
return nil
}

Check failure on line 271 in engine/access/rest/websockets/controller.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not `goimports`-ed with -local github.com/onflow/flow-go/ (goimports)
if err := c.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limiter wait failed: %w", err)
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
Expand Down
60 changes: 60 additions & 0 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,66 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
})
}

// TestRateLimiter tests the rate-limiting functionality of the WebSocket controller.
//
// Test Steps:
// 1. Create a mock WebSocket connection with behavior for `SetWriteDeadline` and `WriteJSON`.
// 2. Configure the WebSocket controller with a rate limit of 2 responses per second.
// 3. Simulate sending messages to the `multiplexedStream` channel.
// 4. Collect timestamps of message writes to verify rate-limiting behavior.
// 5. Assert that all messages are processed and that the delay between messages respects the configured rate limit.
//
// The test ensures that:
// - The number of messages processed matches the total messages sent.
// - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms.
func (s *WsControllerSuite) TestRateLimiter() {
s.T().Run("Enforces response rate limit", func(t *testing.T) {
totalMessages := 5 // Number of messages to simulate.

// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also check that we indeed receive expected messages in expected order at this place.

}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could possibly simplify next statement using: https://github.com/stretchr/testify/blob/master/require/require.go#L869

assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit")
}
})
Comment on lines +684 to +728
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s.T().Run("Enforces response rate limit", func(t *testing.T) {
totalMessages := 5 // Number of messages to simulate.
// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)
// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)
// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()
// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())
}).Return(nil).Times(totalMessages)
// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())
// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")
// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation.
// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit")
assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit")
}
})
totalMessages := 5 // Number of messages to simulate.
// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)
// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)
// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()
// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())
}).Return(nil).Times(totalMessages)
// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())
// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")
// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation.
// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit")
assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit")
}

}

// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
func (s *WsControllerSuite) TestConfigureKeepaliveConnection() {
s.T().Run("Happy path", func(t *testing.T) {
Expand Down
Loading