-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Upgrade lru cache #4605
Changes from 5 commits
0f82f8c
3d23bcf
f018aca
8dd08db
b3865c4
ffd8cf7
ecdafd7
d3a5145
8c6fbb4
33410a2
cba02b4
985b558
c19b3a9
fe00dfd
9507585
eb0257c
42897b2
9935534
1e60d8d
0d51ec3
da2c6be
de50262
01d423e
43193d6
77e65b4
ed1b3b2
4cfeafc
fef906c
5b31bfa
ae8ccaa
37bb9a3
33232c2
00768db
507af3e
d7c82aa
3da4a39
3af348b
6aee5d7
33bb8b7
800ec4c
5967df8
47fe825
77678a4
a1d9268
3e75d93
4ef02f3
e169555
72d1330
3eed6df
958e7b8
659e22f
fe6e4b1
770d052
ae57bcf
73ea0e9
a4ae343
a6e33aa
6b95d40
5ff6b10
1e28982
7a2cbb4
fc26728
86a5fa0
56e0307
b69632b
fff2cf2
a59b763
730b46e
2af6f84
2660643
8e4b0fc
689ac6d
c2938e4
eb2ba85
8ff9fb8
d03e611
e3a49d2
41c81a6
f7805a0
4936f8b
9efd131
d6a71cd
b07879a
eb1808a
e29737b
9a69798
3ef9bbe
04e8222
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -8,7 +8,7 @@ import ( | |||||
"time" | ||||||
|
||||||
"github.com/hashicorp/go-multierror" | ||||||
lru "github.com/hashicorp/golang-lru" | ||||||
lru "github.com/hashicorp/golang-lru/v2" | ||||||
"github.com/onflow/flow/protobuf/go/flow/access" | ||||||
execproto "github.com/onflow/flow/protobuf/go/flow/execution" | ||||||
"github.com/rs/zerolog" | ||||||
|
@@ -27,13 +27,14 @@ import ( | |||||
const uniqueScriptLoggingTimeWindow = 10 * time.Minute | ||||||
|
||||||
type backendScripts struct { | ||||||
headers storage.Headers | ||||||
executionReceipts storage.ExecutionReceipts | ||||||
state protocol.State | ||||||
connFactory connection.ConnectionFactory | ||||||
log zerolog.Logger | ||||||
metrics module.BackendScriptsMetrics | ||||||
loggedScripts *lru.Cache | ||||||
headers storage.Headers | ||||||
executionReceipts storage.ExecutionReceipts | ||||||
state protocol.State | ||||||
connFactory connection.ConnectionFactory | ||||||
log zerolog.Logger | ||||||
metrics module.BackendScriptsMetrics | ||||||
loggedScripts *lru.Cache[[16]byte, time.Time] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
|
||||||
archiveAddressList []string | ||||||
archivePorts []uint | ||||||
scriptExecValidation bool | ||||||
|
@@ -310,14 +311,11 @@ func (b *backendScripts) executeScriptOnAvailableExecutionNodes( | |||||
|
||||||
// shouldLogScript checks if the script hash is unique in the time window | ||||||
func (b *backendScripts) shouldLogScript(execTime time.Time, scriptHash [16]byte) bool { | ||||||
rawTimestamp, seen := b.loggedScripts.Get(scriptHash) | ||||||
if !seen || rawTimestamp == nil { | ||||||
timestamp, seen := b.loggedScripts.Get(scriptHash) | ||||||
if !seen { | ||||||
return true | ||||||
} else { | ||||||
// safe cast | ||||||
timestamp := rawTimestamp.(time.Time) | ||||||
return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow | ||||||
} | ||||||
return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow | ||||||
} | ||||||
|
||||||
func (b *backendScripts) tryExecuteScriptOnExecutionNode( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,29 +4,24 @@ import ( | |
"context" | ||
"fmt" | ||
"net" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/sony/gobreaker" | ||
|
||
"go.uber.org/atomic" | ||
"pgregory.net/rapid" | ||
|
||
lru "github.com/hashicorp/golang-lru" | ||
lru "github.com/hashicorp/golang-lru/v2" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here and all other places, we try to use the following import group ordering import (
"std-libs"
"external/packages"
"onflow/other-repo/packages"
"onflow/flow-go/packages"
) the linter will enforce local mode imports There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
"github.com/onflow/flow/protobuf/go/flow/access" | ||
"github.com/onflow/flow/protobuf/go/flow/execution" | ||
"github.com/sony/gobreaker" | ||
"github.com/stretchr/testify/assert" | ||
testifymock "github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/atomic" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/status" | ||
"pgregory.net/rapid" | ||
|
||
"github.com/onflow/flow-go/engine/access/mock" | ||
"github.com/onflow/flow-go/module/metrics" | ||
"github.com/onflow/flow-go/utils/unittest" | ||
) | ||
|
@@ -129,8 +124,8 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) { | |
connectionFactory.CollectionGRPCPort = cn.port | ||
// set the connection pool cache size | ||
cacheSize := 1 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: how about refactoring a bit to move the duplicated code to a helper? func getCache(t *testing.T, cacheSize int) *lru.Cache[string, *CachedClient] {
cache, err := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) {
err := client.Close()
require.NoError(t, err)
})
require.NoError(t, err)
return cache
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
client.Close() | ||
}) | ||
connectionCache := NewCache(cache, cacheSize) | ||
|
||
|
@@ -184,8 +179,8 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) { | |
connectionFactory.ExecutionGRPCPort = en.port | ||
// set the connection pool cache size | ||
cacheSize := 5 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
|
@@ -245,8 +240,8 @@ func TestExecutionNodeClientTimeout(t *testing.T) { | |
connectionFactory.ExecutionNodeGRPCTimeout = timeout | ||
// set the connection pool cache size | ||
cacheSize := 5 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
|
@@ -294,8 +289,8 @@ func TestCollectionNodeClientTimeout(t *testing.T) { | |
connectionFactory.CollectionNodeGRPCTimeout = timeout | ||
// set the connection pool cache size | ||
cacheSize := 5 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
|
@@ -343,9 +338,10 @@ func TestConnectionPoolFull(t *testing.T) { | |
connectionFactory.CollectionGRPCPort = cn1.port | ||
// set the connection pool cache size | ||
cacheSize := 2 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
|
||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
connectionFactory.AccessMetrics = metrics.NewNoopCollector() | ||
|
@@ -419,9 +415,10 @@ func TestConnectionPoolStale(t *testing.T) { | |
connectionFactory.CollectionGRPCPort = cn.port | ||
// set the connection pool cache size | ||
cacheSize := 5 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
|
||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
connectionFactory.AccessMetrics = metrics.NewNoopCollector() | ||
|
@@ -508,9 +505,10 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) { | |
connectionFactory.ExecutionNodeGRPCTimeout = time.Second | ||
// set the connection pool cache size | ||
cacheSize := 1 | ||
cache, _ := lru.NewWithEvict(cacheSize, func(_, evictedValue interface{}) { | ||
evictedValue.(*CachedClient).Close() | ||
cache, _ := lru.NewWithEvict[string, *CachedClient](cacheSize, func(_ string, client *CachedClient) { | ||
client.Close() | ||
}) | ||
|
||
connectionCache := NewCache(cache, cacheSize) | ||
// set metrics reporting | ||
connectionFactory.AccessMetrics = metrics.NewNoopCollector() | ||
|
@@ -592,7 +590,7 @@ func TestExecutionEvictingCacheClients(t *testing.T) { | |
connectionFactory.CollectionNodeGRPCTimeout = 5 * time.Second | ||
// Set the connection pool cache size | ||
cacheSize := 1 | ||
cache, err := lru.New(cacheSize) | ||
cache, err := lru.New[string, *CachedClient](cacheSize) | ||
require.NoError(t, err) | ||
|
||
connectionCache := NewCache(cache, cacheSize) | ||
|
@@ -614,8 +612,7 @@ func TestExecutionEvictingCacheClients(t *testing.T) { | |
ctx := context.Background() | ||
|
||
// Retrieve the cached client from the cache | ||
result, _ := cache.Get(clientAddress) | ||
cachedClient := result.(*CachedClient) | ||
cachedClient, _ := cache.Get(clientAddress) | ||
nozim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Schedule the invalidation of the access API client after a delay | ||
time.AfterFunc(250*time.Millisecond, func() { | ||
|
@@ -671,7 +668,7 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { | |
|
||
// Set the connection pool cache size. | ||
cacheSize := 1 | ||
connectionCache, _ := lru.New(cacheSize) | ||
connectionCache, _ := lru.New[string, *CachedClient](cacheSize) | ||
nozim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
connectionFactory.Manager = NewManager( | ||
NewCache(connectionCache, cacheSize), | ||
|
@@ -755,8 +752,8 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { | |
|
||
// Set the connection pool cache size. | ||
cacheSize := 1 | ||
connectionCache, _ := lru.New(cacheSize) | ||
|
||
connectionCache, _ := lru.New[string, *CachedClient](cacheSize) | ||
nozim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
connectionFactory.Manager = NewManager( | ||
NewCache(connectionCache, cacheSize), | ||
unittest.Logger(), | ||
|
@@ -812,79 +809,3 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { | |
assert.Greater(t, requestTimeout, duration) | ||
assert.Equal(t, nil, err) | ||
} | ||
|
||
// node mocks a flow node that runs a GRPC server | ||
type node struct { | ||
server *grpc.Server | ||
listener net.Listener | ||
port uint | ||
} | ||
|
||
func (n *node) setupNode(t *testing.T) { | ||
n.server = grpc.NewServer() | ||
listener, err := net.Listen("tcp4", unittest.DefaultAddress) | ||
assert.NoError(t, err) | ||
n.listener = listener | ||
assert.Eventually(t, func() bool { | ||
return !strings.HasSuffix(listener.Addr().String(), ":0") | ||
}, time.Second*4, 10*time.Millisecond) | ||
|
||
_, port, err := net.SplitHostPort(listener.Addr().String()) | ||
assert.NoError(t, err) | ||
portAsUint, err := strconv.ParseUint(port, 10, 32) | ||
assert.NoError(t, err) | ||
n.port = uint(portAsUint) | ||
} | ||
|
||
func (n *node) start(t *testing.T) { | ||
// using a wait group here to ensure the goroutine has started before returning. Otherwise, | ||
// there's a race condition where the server is sometimes stopped before it has started | ||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
go func() { | ||
wg.Done() | ||
err := n.server.Serve(n.listener) | ||
assert.NoError(t, err) | ||
}() | ||
unittest.RequireReturnsBefore(t, wg.Wait, 10*time.Millisecond, "could not start goroutine on time") | ||
} | ||
|
||
func (n *node) stop(t *testing.T) { | ||
if n.server != nil { | ||
n.server.Stop() | ||
} | ||
} | ||
|
||
type executionNode struct { | ||
node | ||
handler *mock.ExecutionAPIServer | ||
} | ||
|
||
func (en *executionNode) start(t *testing.T) { | ||
en.setupNode(t) | ||
handler := new(mock.ExecutionAPIServer) | ||
execution.RegisterExecutionAPIServer(en.server, handler) | ||
en.handler = handler | ||
en.node.start(t) | ||
} | ||
|
||
func (en *executionNode) stop(t *testing.T) { | ||
en.node.stop(t) | ||
} | ||
|
||
type collectionNode struct { | ||
node | ||
handler *mock.AccessAPIServer | ||
} | ||
|
||
func (cn *collectionNode) start(t *testing.T) { | ||
cn.setupNode(t) | ||
handler := new(mock.AccessAPIServer) | ||
access.RegisterAccessAPIServer(cn.server, handler) | ||
cn.handler = handler | ||
cn.node.start(t) | ||
} | ||
|
||
func (cn *collectionNode) stop(t *testing.T) { | ||
cn.node.stop(t) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4ef02f3