diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 9c53aada9f6..8809dabd120 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -2,12 +2,14 @@ package backend import ( "context" + "crypto/md5" //nolint:gosec "fmt" "net" "strconv" "time" - lru "github.com/hashicorp/golang-lru" + lru2 "github.com/hashicorp/golang-lru/v2" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,8 +24,6 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" - - accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) // minExecutionNodesCnt is the minimum number of execution nodes expected to have sent the execution receipt for a block @@ -119,7 +119,7 @@ func New( retry.Activate() } - loggedScripts, err := lru.New(DefaultLoggedScriptsCacheSize) + loggedScripts, err := lru2.New[[md5.Size]byte, time.Time](DefaultLoggedScriptsCacheSize) if err != nil { log.Fatal().Err(err).Msg("failed to initialize script logging cache") } @@ -228,14 +228,13 @@ func NewCache( log zerolog.Logger, accessMetrics module.AccessMetrics, connectionPoolSize uint, -) (*lru.Cache, uint, error) { +) (*lru2.Cache[string, *connection.CachedClient], uint, error) { - var cache *lru.Cache + var cache *lru2.Cache[string, *connection.CachedClient] cacheSize := connectionPoolSize if cacheSize > 0 { var err error - cache, err = lru.NewWithEvict(int(cacheSize), func(_, evictedValue interface{}) { - store := evictedValue.(*connection.CachedClient) + cache, err = lru2.NewWithEvict[string, *connection.CachedClient](int(cacheSize), func(_ string, store *connection.CachedClient) { store.Close() log.Debug().Str("grpc_conn_evicted", store.Address).Msg("closing grpc connection evicted from pool") if accessMetrics != nil { diff --git a/engine/access/rpc/backend/backend_block_details.go b/engine/access/rpc/backend/backend_block_details.go index 19336ded8a4..fc9eee618c4 100644 --- a/engine/access/rpc/backend/backend_block_details.go +++ b/engine/access/rpc/backend/backend_block_details.go @@ -49,11 +49,11 @@ func (b *backendBlockDetails) GetLatestBlock(_ context.Context, isSealed bool) ( return nil, flow.BlockStatusUnknown, status.Errorf(codes.Internal, "could not get latest block: %v", err) } - status, err := b.getBlockStatus(block) + stat, err := b.getBlockStatus(block) if err != nil { - return nil, status, err + return nil, stat, err } - return block, status, nil + return block, stat, nil } func (b *backendBlockDetails) GetBlockByID(_ context.Context, id flow.Identifier) (*flow.Block, flow.BlockStatus, error) { @@ -62,11 +62,11 @@ func (b *backendBlockDetails) GetBlockByID(_ context.Context, id flow.Identifier return nil, flow.BlockStatusUnknown, rpc.ConvertStorageError(err) } - status, err := b.getBlockStatus(block) + stat, err := b.getBlockStatus(block) if err != nil { - return nil, status, err + return nil, stat, err } - return block, status, nil + return block, stat, nil } func (b *backendBlockDetails) GetBlockByHeight(_ context.Context, height uint64) (*flow.Block, flow.BlockStatus, error) { @@ -75,11 +75,11 @@ func (b *backendBlockDetails) GetBlockByHeight(_ context.Context, height uint64) return nil, flow.BlockStatusUnknown, rpc.ConvertStorageError(err) } - status, err := b.getBlockStatus(block) + stat, err := b.getBlockStatus(block) if err != nil { - return nil, status, err + return nil, stat, err } - return block, status, nil + return block, stat, nil } func (b *backendBlockDetails) getBlockStatus(block *flow.Block) (flow.BlockStatus, error) { diff --git a/engine/access/rpc/backend/backend_block_headers.go b/engine/access/rpc/backend/backend_block_headers.go index 178f9064f1f..ac4116224d4 100644 --- a/engine/access/rpc/backend/backend_block_headers.go +++ b/engine/access/rpc/backend/backend_block_headers.go @@ -42,11 +42,11 @@ func (b *backendBlockHeaders) GetLatestBlockHeader(_ context.Context, isSealed b return nil, flow.BlockStatusUnknown, status.Errorf(codes.Internal, "could not get latest block header: %v", err) } - status, err := b.getBlockStatus(header) + stat, err := b.getBlockStatus(header) if err != nil { - return nil, status, err + return nil, stat, err } - return header, status, nil + return header, stat, nil } func (b *backendBlockHeaders) GetBlockHeaderByID(_ context.Context, id flow.Identifier) (*flow.Header, flow.BlockStatus, error) { @@ -55,11 +55,11 @@ func (b *backendBlockHeaders) GetBlockHeaderByID(_ context.Context, id flow.Iden return nil, flow.BlockStatusUnknown, rpc.ConvertStorageError(err) } - status, err := b.getBlockStatus(header) + stat, err := b.getBlockStatus(header) if err != nil { - return nil, status, err + return nil, stat, err } - return header, status, nil + return header, stat, nil } func (b *backendBlockHeaders) GetBlockHeaderByHeight(_ context.Context, height uint64) (*flow.Header, flow.BlockStatus, error) { @@ -68,11 +68,11 @@ func (b *backendBlockHeaders) GetBlockHeaderByHeight(_ context.Context, height u return nil, flow.BlockStatusUnknown, rpc.ConvertStorageError(err) } - status, err := b.getBlockStatus(header) + stat, err := b.getBlockStatus(header) if err != nil { - return nil, status, err + return nil, stat, err } - return header, status, nil + return header, stat, nil } func (b *backendBlockHeaders) getBlockStatus(header *flow.Header) (flow.BlockStatus, error) { @@ -80,7 +80,7 @@ func (b *backendBlockHeaders) getBlockStatus(header *flow.Header) (flow.BlockSta if err != nil { // In the RPC engine, if we encounter an error from the protocol state indicating state corruption, // we should halt processing requests, but do throw an exception which might cause a crash: - // - It is unsafe to process requests if we have an internally bad state. + // - It is unsafe to process requests if we have an internally bad State. // TODO: https://github.com/onflow/flow-go/issues/4028 // - We would like to avoid throwing an exception as a result of an Access API request by policy // because this can cause DOS potential diff --git a/engine/access/rpc/backend/backend_network.go b/engine/access/rpc/backend/backend_network.go index 577ebaa8a84..cb704d93278 100644 --- a/engine/access/rpc/backend/backend_network.go +++ b/engine/access/rpc/backend/backend_network.go @@ -4,12 +4,11 @@ import ( "context" "fmt" - "github.com/onflow/flow-go/cmd/build" - "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/cmd/build" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/state/protocol" diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 2d875a460ea..0bceaac3815 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -8,7 +8,7 @@ import ( "time" "github.com/hashicorp/go-multierror" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "github.com/onflow/flow/protobuf/go/flow/access" execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/rs/zerolog" @@ -27,13 +27,14 @@ import ( const uniqueScriptLoggingTimeWindow = 10 * time.Minute type backendScripts struct { - headers storage.Headers - executionReceipts storage.ExecutionReceipts - state protocol.State - connFactory connection.ConnectionFactory - log zerolog.Logger - metrics module.BackendScriptsMetrics - loggedScripts *lru.Cache + headers storage.Headers + executionReceipts storage.ExecutionReceipts + state protocol.State + connFactory connection.ConnectionFactory + log zerolog.Logger + metrics module.BackendScriptsMetrics + loggedScripts *lru.Cache[[md5.Size]byte, time.Time] + archiveAddressList []string archivePorts []uint scriptExecValidation bool @@ -219,7 +220,7 @@ func (b *backendScripts) executeScriptOnAvailableArchiveNodes( Msg("script failed to execute on the execution node") return nil, err case codes.NotFound: - // failures due to unavailable blocks are explicitly marked Not found + // failures due to unavailable blocks are explicitly marked Not found b.metrics.ScriptExecutionErrorOnArchiveNode() b.log.Error().Err(err).Msg("script execution failed for archive node") return nil, err @@ -310,14 +311,11 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes( // shouldLogScript checks if the script hash is unique in the time window func (b *backendScripts) shouldLogScript(execTime time.Time, scriptHash [16]byte) bool { - rawTimestamp, seen := b.loggedScripts.Get(scriptHash) - if !seen || rawTimestamp == nil { + timestamp, seen := b.loggedScripts.Get(scriptHash) + if !seen { return true - } else { - // safe cast - timestamp := rawTimestamp.(time.Time) - return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow } + return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow } func (b *backendScripts) tryExecuteScriptOnExecutionNode( diff --git a/engine/access/rpc/connection/cache.go b/engine/access/rpc/connection/cache.go index 9aa53d3d251..133cac9dd12 100644 --- a/engine/access/rpc/connection/cache.go +++ b/engine/access/rpc/connection/cache.go @@ -4,7 +4,7 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru/v2" "go.uber.org/atomic" "google.golang.org/grpc" ) @@ -38,12 +38,12 @@ func (s *CachedClient) Close() { // Cache represents a cache of CachedClient instances with a given maximum size. type Cache struct { - cache *lru.Cache + cache *lru.Cache[string, *CachedClient] size int } // NewCache creates a new Cache with the specified maximum size and the underlying LRU cache. -func NewCache(cache *lru.Cache, size int) *Cache { +func NewCache(cache *lru.Cache[string, *CachedClient], size int) *Cache { return &Cache{ cache: cache, size: size, @@ -57,7 +57,7 @@ func (c *Cache) Get(address string) (*CachedClient, bool) { if !ok { return nil, false } - return val.(*CachedClient), true + return val, true } // GetOrAdd atomically gets the CachedClient for the given address from the cache, or adds a new one @@ -71,7 +71,7 @@ func (c *Cache) GetOrAdd(address string, timeout time.Duration) (*CachedClient, val, existed, _ := c.cache.PeekOrAdd(address, client) if existed { - return val.(*CachedClient), true + return val, true } client.Address = address diff --git a/engine/access/rpc/connection/connection_test.go b/engine/access/rpc/connection/connection_test.go index a961816605e..d821638fd10 100644 --- a/engine/access/rpc/connection/connection_test.go +++ b/engine/access/rpc/connection/connection_test.go @@ -4,29 +4,25 @@ import ( "context" "fmt" "net" - "strconv" - "strings" "sync" "testing" "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/sony/gobreaker" - - "go.uber.org/atomic" - "pgregory.net/rapid" - - lru "github.com/hashicorp/golang-lru" - "github.com/onflow/flow/protobuf/go/flow/access" - "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/stretchr/testify/assert" testifymock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" + "pgregory.net/rapid" + + "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/execution" - "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/utils/unittest" ) @@ -113,6 +109,14 @@ func TestProxyExecutionAPI(t *testing.T) { assert.Equal(t, resp, expected) } +func getCache(t *testing.T, cacheSize int) *lru.Cache[string, *CachedClient] { + cache, err := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { + client.Close() + }) + require.NoError(t, err) + return cache +} + func TestProxyAccessAPIConnectionReuse(t *testing.T) { // create a collection node cn := new(collectionNode) @@ -129,10 +133,7 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) { connectionFactory.CollectionGRPCPort = cn.port // set the connection pool cache size cacheSize := 1 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() @@ -184,10 +185,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) { connectionFactory.ExecutionGRPCPort = en.port // set the connection pool cache size cacheSize := 5 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -245,10 +243,7 @@ func TestExecutionNodeClientTimeout(t *testing.T) { connectionFactory.ExecutionNodeGRPCTimeout = timeout // set the connection pool cache size cacheSize := 5 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -294,10 +289,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) { connectionFactory.CollectionNodeGRPCTimeout = timeout // set the connection pool cache size cacheSize := 5 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -343,10 +335,8 @@ func TestConnectionPoolFull(t *testing.T) { connectionFactory.CollectionGRPCPort = cn1.port // set the connection pool cache size cacheSize := 2 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -419,10 +409,8 @@ func TestConnectionPoolStale(t *testing.T) { connectionFactory.CollectionGRPCPort = cn.port // set the connection pool cache size cacheSize := 5 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + + connectionCache := NewCache(getCache(t, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -477,18 +465,18 @@ func TestConnectionPoolStale(t *testing.T) { // - Invalidate the execution API client. // - Wait for all goroutines to finish. // - Verify that the number of completed requests matches the number of sent responses. -func TestExecutionNodeClientClosedGracefully(t *testing.T) { +func TestExecutionNodeClientClosedGracefully(tt *testing.T) { // Add createExecNode function to recreate it each time for rapid test createExecNode := func() (*executionNode, func()) { en := new(executionNode) - en.start(t) + en.start(tt) return en, func() { - en.stop(t) + en.stop(tt) } } // Add rapid test, to check graceful close on different number of requests - rapid.Check(t, func(t *rapid.T) { + rapid.Check(tt, func(t *rapid.T) { en, closer := createExecNode() defer closer() @@ -508,10 +496,8 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) { connectionFactory.ExecutionNodeGRPCTimeout = time.Second // set the connection pool cache size cacheSize := 1 - cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { - evictedValue.(*CachedClient).Close() - }) - connectionCache := NewCache(cache, cacheSize) + + connectionCache := NewCache(getCache(tt, cacheSize), cacheSize) // set metrics reporting connectionFactory.AccessMetrics = metrics.NewNoopCollector() connectionFactory.Manager = NewManager( @@ -592,7 +578,7 @@ func TestExecutionEvictingCacheClients(t *testing.T) { connectionFactory.CollectionNodeGRPCTimeout = 5 * time.Second // Set the connection pool cache size cacheSize := 1 - cache, err := lru.New(cacheSize) + cache, err := lru.New[string, *CachedClient](cacheSize) require.NoError(t, err) connectionCache := NewCache(cache, cacheSize) @@ -614,9 +600,8 @@ func TestExecutionEvictingCacheClients(t *testing.T) { ctx := context.Background() // Retrieve the cached client from the cache - result, _ := cache.Get(clientAddress) - cachedClient := result.(*CachedClient) - + cachedClient, ok := cache.Get(clientAddress) + require.True(t, ok) // Schedule the invalidation of the access API client after a delay time.AfterFunc(250*time.Millisecond, func() { // Invalidate the access API client @@ -671,8 +656,8 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { // Set the connection pool cache size. cacheSize := 1 - connectionCache, _ := lru.New(cacheSize) - + connectionCache, err := lru.New[string, *CachedClient](cacheSize) + require.NoError(t, err) connectionFactory.Manager = NewManager( NewCache(connectionCache, cacheSize), unittest.Logger(), @@ -755,8 +740,9 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { // Set the connection pool cache size. cacheSize := 1 - connectionCache, _ := lru.New(cacheSize) + connectionCache, err := lru.New[string, *CachedClient](cacheSize) + require.NoError(t, err) connectionFactory.Manager = NewManager( NewCache(connectionCache, cacheSize), unittest.Logger(), @@ -812,79 +798,3 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { assert.Greater(t, requestTimeout, duration) assert.Equal(t, nil, err) } - -// node mocks a flow node that runs a GRPC server -type node struct { - server *grpc.Server - listener net.Listener - port uint -} - -func (n *node) setupNode(t *testing.T) { - n.server = grpc.NewServer() - listener, err := net.Listen("tcp4", unittest.DefaultAddress) - assert.NoError(t, err) - n.listener = listener - assert.Eventually(t, func() bool { - return !strings.HasSuffix(listener.Addr().String(), ":0") - }, time.Second*4, 10*time.Millisecond) - - _, port, err := net.SplitHostPort(listener.Addr().String()) - assert.NoError(t, err) - portAsUint, err := strconv.ParseUint(port, 10, 32) - assert.NoError(t, err) - n.port = uint(portAsUint) -} - -func (n *node) start(t *testing.T) { - // using a wait group here to ensure the goroutine has started before returning. Otherwise, - // there's a race condition where the server is sometimes stopped before it has started - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - wg.Done() - err := n.server.Serve(n.listener) - assert.NoError(t, err) - }() - unittest.RequireReturnsBefore(t, wg.Wait, 10*time.Millisecond, "could not start goroutine on time") -} - -func (n *node) stop(t *testing.T) { - if n.server != nil { - n.server.Stop() - } -} - -type executionNode struct { - node - handler *mock.ExecutionAPIServer -} - -func (en *executionNode) start(t *testing.T) { - en.setupNode(t) - handler := new(mock.ExecutionAPIServer) - execution.RegisterExecutionAPIServer(en.server, handler) - en.handler = handler - en.node.start(t) -} - -func (en *executionNode) stop(t *testing.T) { - en.node.stop(t) -} - -type collectionNode struct { - node - handler *mock.AccessAPIServer -} - -func (cn *collectionNode) start(t *testing.T) { - cn.setupNode(t) - handler := new(mock.AccessAPIServer) - access.RegisterAccessAPIServer(cn.server, handler) - cn.handler = handler - cn.node.start(t) -} - -func (cn *collectionNode) stop(t *testing.T) { - cn.node.stop(t) -} diff --git a/engine/access/rpc/connection/mock_node.go b/engine/access/rpc/connection/mock_node.go new file mode 100644 index 00000000000..e8929ab6ef3 --- /dev/null +++ b/engine/access/rpc/connection/mock_node.go @@ -0,0 +1,94 @@ +package connection + +import ( + "net" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/execution" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + + "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +// node mocks a flow node that runs a GRPC server +type node struct { + server *grpc.Server + listener net.Listener + port uint +} + +func (n *node) setupNode(t *testing.T) { + n.server = grpc.NewServer() + listener, err := net.Listen("tcp4", unittest.DefaultAddress) + assert.NoError(t, err) + n.listener = listener + assert.Eventually(t, func() bool { + return !strings.HasSuffix(listener.Addr().String(), ":0") + }, time.Second*4, 10*time.Millisecond) + + _, port, err := net.SplitHostPort(listener.Addr().String()) + assert.NoError(t, err) + portAsUint, err := strconv.ParseUint(port, 10, 32) + assert.NoError(t, err) + n.port = uint(portAsUint) +} + +func (n *node) start(t *testing.T) { + // using a wait group here to ensure the goroutine has started before returning. Otherwise, + // there's a race condition where the server is sometimes stopped before it has started + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + wg.Done() + err := n.server.Serve(n.listener) + assert.NoError(t, err) + }() + unittest.RequireReturnsBefore(t, wg.Wait, 10*time.Millisecond, "could not start goroutine on time") +} + +func (n *node) stop(t *testing.T) { + if n.server != nil { + n.server.Stop() + } +} + +type executionNode struct { + node + handler *mock.ExecutionAPIServer +} + +func (en *executionNode) start(t *testing.T) { + en.setupNode(t) + handler := new(mock.ExecutionAPIServer) + execution.RegisterExecutionAPIServer(en.server, handler) + en.handler = handler + en.node.start(t) +} + +func (en *executionNode) stop(t *testing.T) { + en.node.stop(t) +} + +type collectionNode struct { + node + handler *mock.AccessAPIServer +} + +func (cn *collectionNode) start(t *testing.T) { + cn.setupNode(t) + handler := new(mock.AccessAPIServer) + access.RegisterAccessAPIServer(cn.server, handler) + cn.handler = handler + cn.node.start(t) +} + +func (cn *collectionNode) stop(t *testing.T) { + cn.node.stop(t) +} diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index b4b1d1e6980..8eb1b51c064 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -31,8 +32,6 @@ import ( storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/unittest" - - accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) type RateLimitTestSuite struct { diff --git a/engine/consensus/approvals/approvals_lru_cache.go b/engine/consensus/approvals/approvals_lru_cache.go index f4b84d008a1..26fccc715ff 100644 --- a/engine/consensus/approvals/approvals_lru_cache.go +++ b/engine/consensus/approvals/approvals_lru_cache.go @@ -3,7 +3,7 @@ package approvals import ( "sync" - "github.com/hashicorp/golang-lru/simplelru" + "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/onflow/flow-go/model/flow" ) @@ -11,7 +11,7 @@ import ( // LruCache is a wrapper over `simplelru.LRUCache` that provides needed api for processing result approvals // Extends functionality of `simplelru.LRUCache` by introducing additional index for quicker access. type LruCache struct { - lru simplelru.LRUCache + lru simplelru.LRUCache[flow.Identifier, *flow.ResultApproval] lock sync.RWMutex // secondary index by result id, since multiple approvals could // reference same result @@ -21,8 +21,7 @@ type LruCache struct { func NewApprovalsLRUCache(limit uint) *LruCache { byResultID := make(map[flow.Identifier]map[flow.Identifier]struct{}) // callback has to be called while we are holding lock - lru, _ := simplelru.NewLRU(int(limit), func(key interface{}, value interface{}) { - approval := value.(*flow.ResultApproval) + lru, _ := simplelru.NewLRU(int(limit), func(key flow.Identifier, approval *flow.ResultApproval) { delete(byResultID[approval.Body.ExecutionResultID], approval.Body.PartialID()) if len(byResultID[approval.Body.ExecutionResultID]) == 0 { delete(byResultID, approval.Body.ExecutionResultID) @@ -40,7 +39,7 @@ func (c *LruCache) Peek(approvalID flow.Identifier) *flow.ResultApproval { // check if we have it in the cache resource, cached := c.lru.Peek(approvalID) if cached { - return resource.(*flow.ResultApproval) + return resource } return nil @@ -52,7 +51,7 @@ func (c *LruCache) Get(approvalID flow.Identifier) *flow.ResultApproval { // check if we have it in the cache resource, cached := c.lru.Get(approvalID) if cached { - return resource.(*flow.ResultApproval) + return resource } return nil @@ -74,7 +73,7 @@ func (c *LruCache) TakeByResultID(resultID flow.Identifier) []*flow.ResultApprov // no need to cleanup secondary index since it will be // cleaned up in evict callback _ = c.lru.Remove(approvalID) - approvals = append(approvals, resource.(*flow.ResultApproval)) + approvals = append(approvals, resource) } } diff --git a/engine/consensus/mock/proposal_provider.go b/engine/consensus/mock/proposal_provider.go index b53cef236e1..ad0b3d5923b 100644 --- a/engine/consensus/mock/proposal_provider.go +++ b/engine/consensus/mock/proposal_provider.go @@ -3,8 +3,9 @@ package mock import ( - messages "github.com/onflow/flow-go/model/messages" mock "github.com/stretchr/testify/mock" + + messages "github.com/onflow/flow-go/model/messages" ) // ProposalProvider is an autogenerated mock type for the ProposalProvider type diff --git a/fvm/storage/derived/derived_chain_data.go b/fvm/storage/derived/derived_chain_data.go index a3ec9a488df..8bcf0b03d00 100644 --- a/fvm/storage/derived/derived_chain_data.go +++ b/fvm/storage/derived/derived_chain_data.go @@ -4,7 +4,7 @@ import ( "fmt" "sync" - "github.com/hashicorp/golang-lru/simplelru" + "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/onflow/flow-go/model/flow" ) @@ -21,11 +21,11 @@ type DerivedChainData struct { // on Get. mutex sync.Mutex - lru *simplelru.LRU + lru *simplelru.LRU[flow.Identifier, *DerivedBlockData] } func NewDerivedChainData(chainCacheSize uint) (*DerivedChainData, error) { - lru, err := simplelru.NewLRU(int(chainCacheSize), nil) + lru, err := simplelru.NewLRU[flow.Identifier, *DerivedBlockData](int(chainCacheSize), nil) if err != nil { return nil, fmt.Errorf("cannot create LRU cache: %w", err) } @@ -40,7 +40,7 @@ func (chain *DerivedChainData) unsafeGet( ) *DerivedBlockData { currentEntry, ok := chain.lru.Get(currentBlockId) if ok { - return currentEntry.(*DerivedBlockData) + return currentEntry } return nil @@ -70,7 +70,7 @@ func (chain *DerivedChainData) GetOrCreateDerivedBlockData( var current *DerivedBlockData parentEntry, ok := chain.lru.Get(parentBlockId) if ok { - current = parentEntry.(*DerivedBlockData).NewChildDerivedBlockData() + current = parentEntry.NewChildDerivedBlockData() } else { current = NewEmptyDerivedBlockData(0) }