Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Upgrade lru cache v2 #4700

Merged
merged 17 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package backend

import (
"context"
"crypto/md5" //nolint:gosec
"fmt"
"net"
"strconv"
"time"

lru "github.com/hashicorp/golang-lru"
lru2 "github.com/hashicorp/golang-lru/v2"
lru "github.com/hashicorp/golang-lru/v2"
accessproto "github.com/onflow/flow/protobuf/go/flow/access"
"github.com/rs/zerolog"

Expand Down Expand Up @@ -124,7 +124,7 @@ func New(params Params) (*Backend, error) {
retry.Activate()
}

loggedScripts, err := lru.New(DefaultLoggedScriptsCacheSize)
loggedScripts, err := lru.New[[md5.Size]byte, time.Time](DefaultLoggedScriptsCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to initialize script logging cache: %w", err)
}
Expand All @@ -138,9 +138,9 @@ func New(params Params) (*Backend, error) {
archivePorts[idx] = port
}

var txResCache *lru2.Cache[flow.Identifier, *access.TransactionResult]
var txResCache *lru.Cache[flow.Identifier, *access.TransactionResult]
if params.TxResultCacheSize > 0 {
txResCache, err = lru2.New[flow.Identifier, *access.TransactionResult](int(params.TxResultCacheSize))
txResCache, err = lru.New[flow.Identifier, *access.TransactionResult](int(params.TxResultCacheSize))
if err != nil {
return nil, fmt.Errorf("failed to init cache for transaction results: %w", err)
}
Expand Down Expand Up @@ -246,14 +246,13 @@ func NewCache(
log zerolog.Logger,
accessMetrics module.AccessMetrics,
connectionPoolSize uint,
) (*lru.Cache, uint, error) {
) (*lru.Cache[string, *connection.CachedClient], uint, error) {

var cache *lru.Cache
var cache *lru.Cache[string, *connection.CachedClient]
cacheSize := connectionPoolSize
if cacheSize > 0 {
var err error
cache, err = lru.NewWithEvict(int(cacheSize), func(_, evictedValue interface{}) {
store := evictedValue.(*connection.CachedClient)
cache, err = lru.NewWithEvict(int(cacheSize), func(_ string, store *connection.CachedClient) {
store.Close()
log.Debug().Str("grpc_conn_evicted", store.Address).Msg("closing grpc connection evicted from pool")
if accessMetrics != nil {
Expand Down
15 changes: 7 additions & 8 deletions engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/onflow/flow/protobuf/go/flow/access"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
Expand All @@ -33,7 +33,7 @@ type backendScripts struct {
connFactory connection.ConnectionFactory
log zerolog.Logger
metrics module.BackendScriptsMetrics
loggedScripts *lru.Cache
loggedScripts *lru.Cache[[md5.Size]byte, time.Time]
archiveAddressList []string
archivePorts []uint
scriptExecValidation bool
Expand Down Expand Up @@ -309,15 +309,14 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes(
}

// shouldLogScript checks if the script hash is unique in the time window
func (b *backendScripts) shouldLogScript(execTime time.Time, scriptHash [16]byte) bool {
rawTimestamp, seen := b.loggedScripts.Get(scriptHash)
if !seen || rawTimestamp == nil {
return true
} else {
func (b *backendScripts) shouldLogScript(execTime time.Time, scriptHash [md5.Size]byte) bool {
timestamp, seen := b.loggedScripts.Get(scriptHash)
if seen {
// safe cast
timestamp := rawTimestamp.(time.Time)
return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow
}
return true

}

func (b *backendScripts) tryExecuteScriptOnExecutionNode(
Expand Down
4 changes: 2 additions & 2 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"time"

lru2 "github.com/hashicorp/golang-lru/v2"
lru "github.com/hashicorp/golang-lru/v2"
accessproto "github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
Expand Down Expand Up @@ -41,7 +41,7 @@ type backendTransactions struct {
previousAccessNodes []accessproto.AccessAPIClient
log zerolog.Logger
nodeCommunicator Communicator
txResultCache *lru2.Cache[flow.Identifier, *access.TransactionResult]
txResultCache *lru.Cache[flow.Identifier, *access.TransactionResult]
}

// SendTransaction forwards the transaction to the collection node
Expand Down
10 changes: 5 additions & 5 deletions engine/access/rpc/connection/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"
"go.uber.org/atomic"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -38,12 +38,12 @@ func (s *CachedClient) Close() {

// Cache represents a cache of CachedClient instances with a given maximum size.
type Cache struct {
cache *lru.Cache
cache *lru.Cache[string, *CachedClient]
size int
}

// NewCache creates a new Cache with the specified maximum size and the underlying LRU cache.
func NewCache(cache *lru.Cache, size int) *Cache {
func NewCache(cache *lru.Cache[string, *CachedClient], size int) *Cache {
return &Cache{
cache: cache,
size: size,
Expand All @@ -57,7 +57,7 @@ func (c *Cache) Get(address string) (*CachedClient, bool) {
if !ok {
return nil, false
}
return val.(*CachedClient), true
return val, true
}

// GetOrAdd atomically gets the CachedClient for the given address from the cache, or adds a new one
Expand All @@ -71,7 +71,7 @@ func (c *Cache) GetOrAdd(address string, timeout time.Duration) (*CachedClient,

val, existed, _ := c.cache.PeekOrAdd(address, client)
if existed {
return val.(*CachedClient), true
return val, true
}

client.Address = address
Expand Down
Loading
Loading