diff --git a/chain_bridge.go b/chain_bridge.go index d04547f8c..a554858aa 100644 --- a/chain_bridge.go +++ b/chain_bridge.go @@ -31,6 +31,17 @@ const ( maxNumBlocksInCache = 100_000 ) +// cacheableTimestamp is a wrapper around an uint32 that can be used as a value +// in an LRU cache. +type cacheableTimestamp uint32 + +// Size returns the size of the cacheable timestamp. Since we scale the cache by +// the number of items and not the total memory size, we can simply return 1 +// here to count each timestamp as 1 item. +func (c cacheableTimestamp) Size() (uint64, error) { + return 1, nil +} + // LndRpcChainBridge is an implementation of the tapgarden.ChainBridge // interface backed by an active remote lnd node. type LndRpcChainBridge struct { diff --git a/rpcserver.go b/rpcserver.go index 161b4deff..11f9effc4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -100,10 +100,6 @@ const ( ) type ( - // cacheableTimestamp is a wrapper around a uint32 that can be used as a - // value in an LRU cache. - cacheableTimestamp uint32 - // devSendEventStream is a type alias for the asset send event // notification stream. devSendEventStream = tapdevrpc.TapDev_SubscribeSendAssetEventNtfnsServer @@ -148,13 +144,6 @@ type ( } ) -// Size returns the size of the cacheable timestamp. Since we scale the cache by -// the number of items and not the total memory size, we can simply return 1 -// here to count each timestamp as 1 item. -func (c cacheableTimestamp) Size() (uint64, error) { - return 1, nil -} - // rpcServer is the main RPC server for the Taproot Assets daemon that handles // gRPC/REST/Websockets incoming requests. type rpcServer struct { @@ -4592,7 +4581,9 @@ func (r *rpcServer) AssetRoots(ctx context.Context, } resp := &unirpc.AssetRootResponse{ - UniverseRoots: make(map[string]*unirpc.UniverseRoot), + UniverseRoots: make( + map[string]*unirpc.UniverseRoot, len(assetRoots), + ), } // Retrieve config for use in filtering asset roots based on sync export @@ -4770,7 +4761,7 @@ func (r *rpcServer) QueryAssetRoots(ctx context.Context, return nil, err } - // Query for both a issaunce and transfer universe root. + // Query for both an issuance and transfer universe root. assetRoots, err := r.queryAssetProofRoots(ctx, universeID) if err != nil { return nil, err @@ -4960,7 +4951,7 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context, } if req.Limit > universe.MaxPageSize || req.Limit < 0 { - return nil, fmt.Errorf("invalid request limit") + return nil, fmt.Errorf("invalid request limit: %d", req.Limit) } // Check the rate limiter to see if we need to wait at all. If not then diff --git a/sample-tapd.conf b/sample-tapd.conf index d15527e75..25b63ec8e 100644 --- a/sample-tapd.conf +++ b/sample-tapd.conf @@ -310,6 +310,28 @@ ; The burst budget for the universe query rate limiting ; universe.req-burst-budget=10 +[multiverse-caches] + +; The number of proofs that are cached per universe. (default: 5) +; universe.multiverse-caches.proofs-per-universe=5 + +; The number of universes that can have a cache of leaf keys. (default: 2000) +; universe.multiverse-caches.leaves-num-cached-universes=2000 + +; The number of leaf keys that are cached per cached universe. (default: 50) +; universe.multiverse-caches.leaves-per-universe=50 + +; If the syncer cache is enabled. +; universe.multiverse-caches.syncer-cache-enabled=false + +; The pre-allocated size of the syncer cache. (default: 100000) +; universe.multiverse-caches.syncer-cache-pre-alloc-size=100000 + +; The size of the root node page cache for all requests that aren't served by +; the syncer cache. (default: 10240) +; universe.multiverse-caches.root-node-page-cache-size=10240 + + [address] ; If true, tapd will not try to sync issuance proofs for unknown assets when diff --git a/tapcfg/config.go b/tapcfg/config.go index d2dcc4062..4635de007 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -21,6 +21,7 @@ import ( "github.com/jessevdk/go-flags" "github.com/lightninglabs/lndclient" tap "github.com/lightninglabs/taproot-assets" + "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/monitoring" "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/rfq" @@ -286,6 +287,8 @@ type UniverseConfig struct { UniverseQueriesPerSecond rate.Limit `long:"max-qps" description:"The maximum number of queries per second across the set of active universe queries that is permitted. Anything above this starts to get rate limited."` UniverseQueriesBurst int `long:"req-burst-budget" description:"The burst budget for the universe query rate limiting."` + + MultiverseCaches *tapdb.MultiverseCacheConfig `group:"multiverse-caches" namespace:"multiverse-caches"` } // AddrBookConfig is the config that houses any address Book related config @@ -431,6 +434,9 @@ func DefaultConfig() Config { defaultUniverseMaxQps, ), UniverseQueriesBurst: defaultUniverseQueriesBurst, + MultiverseCaches: fn.Ptr( + tapdb.DefaultMultiverseCacheConfig(), + ), }, AddrBook: &AddrBookConfig{ DisableSyncer: false, diff --git a/tapcfg/server.go b/tapcfg/server.go index 31364185d..65279b425 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -126,7 +126,9 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, return db.WithTx(tx) }, ) - multiverse := tapdb.NewMultiverseStore(multiverseDB) + multiverse := tapdb.NewMultiverseStore( + multiverseDB, tapdb.DefaultMultiverseStoreConfig(), + ) uniStatsDB := tapdb.NewTransactionExecutor( db, func(tx *sql.Tx) tapdb.UniverseStatsStore { diff --git a/tapdb/multiverse.go b/tapdb/multiverse.go index 85a023272..e6c65ff66 100644 --- a/tapdb/multiverse.go +++ b/tapdb/multiverse.go @@ -3,24 +3,19 @@ package tapdb import ( "bytes" "context" - "crypto/sha256" "database/sql" "errors" "fmt" - "sync" - "sync/atomic" "time" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr" - "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/mssmt" "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightninglabs/taproot-assets/universe" - "github.com/lightningnetwork/lnd/lnutils" ) const ( @@ -40,8 +35,10 @@ var ( ) type ( + // BaseUniverseRoot is the type returned from the UniverseRoots query. BaseUniverseRoot = sqlc.UniverseRootsRow + // UniverseRootsParams are the parameters for the UniverseRoots query. UniverseRootsParams = sqlc.UniverseRootsParams // MultiverseRoot is the root of a multiverse tree. Two trees exist: @@ -104,409 +101,19 @@ type BatchedMultiverse interface { BatchedTx[BaseMultiverseStore] } -// ProofKey is used to uniquely identify a proof within a universe. This is -// used for the LRU cache for the proofs themselves, which are considered to be -// immutable. -type ProofKey [32]byte - -// NewProofKey takes a universe identifier and leaf key, and returns a proof -// key. -func NewProofKey(id universe.Identifier, key universe.LeafKey) ProofKey { - idBytes := id.Bytes() - leafKeyBytes := key.UniverseKey() - - // The proof key maps down the ID and the leaf key into a single - // 32-byte value: sha256(id || leaf_key).. - h := sha256.New() - h.Write(idBytes[:]) - h.Write(leafKeyBytes[:]) - - return fn.ToArray[ProofKey](h.Sum(nil)) -} - -// numCachedProofs is the number of universe proofs we'll cache. -const numCachedProofs = 50_000 - -// cachedProof is a single cached proof. -type cachedProof []*universe.Proof - -// Size just returns 1 as we're limiting based on the total number of proofs. -func (c *cachedProof) Size() (uint64, error) { - return 1, nil -} - -// leafProofCache is used to cache proofs for issuance leaves for assets w/o a -// group key. -type leafProofCache = *lru.Cache[ProofKey, *cachedProof] - -// newLeafCache creates a new leaf proof cache. -func newLeafCache() leafProofCache { - return lru.NewCache[ProofKey, *cachedProof]( - numCachedProofs, - ) -} - -// treeID is used to uniquely identify a multiverse tree. -type treeID string - -// proofCache a map of proof caches for each proof type. -type proofCache struct { - lnutils.SyncMap[treeID, leafProofCache] - - *cacheLogger -} - -// newProofCache creates a new proof cache. -func newProofCache() *proofCache { - return &proofCache{ - SyncMap: lnutils.SyncMap[treeID, leafProofCache]{}, - cacheLogger: newCacheLogger("universe_proofs"), - } -} - -// fetchProof reads the cached proof for the given ID and leaf key. -func (p *proofCache) fetchProof(id universe.Identifier, - leafKey universe.LeafKey) []*universe.Proof { - - // First, get the sub-cache for this universe ID from the map of - // caches. - idStr := treeID(id.String()) - assetProofCache, _ := p.LoadOrStore(idStr, newLeafCache()) - - // With that lower level cache obtained, we can check to see if we have - // a hit or not. - proofKey := NewProofKey(id, leafKey) - proof, err := assetProofCache.Get(proofKey) - if err == nil { - p.Hit() - return *proof - } - - p.Miss() - - return nil -} - -// insertProof inserts the given proof into the cache. -func (p *proofCache) insertProof(id universe.Identifier, - leafKey universe.LeafKey, proof []*universe.Proof) { - - idStr := treeID(id.String()) - - assetProofCache, _ := p.LoadOrStore(idStr, newLeafCache()) - - proofKey := NewProofKey(id, leafKey) - - log.Debugf("storing proof for %v+%v in cache, key=%x", - id.StringForLog(), leafKey, proofKey[:]) - - proofVal := cachedProof(proof) - if _, err := assetProofCache.Put(proofKey, &proofVal); err != nil { - log.Errorf("unable to insert into proof cache: %v", err) - } -} - -// delProofsForAsset deletes all the proofs for the given asset. -func (p *proofCache) delProofsForAsset(id universe.Identifier) { - log.Debugf("wiping proofs for %v from cache", id) - - idStr := treeID(id.String()) - p.Delete(idStr) -} - -// rootPageQuery is a wrapper around a query to fetch all the roots, but with -// pagination parameters. -type rootPageQuery struct { - withAmountsById bool - leafQuery -} - -// newRootPageQuery creates a new root page query. -func newRootPageQuery(q universe.RootNodesQuery) rootPageQuery { - return rootPageQuery{ - withAmountsById: q.WithAmountsById, - leafQuery: leafQuery{ - sortDirection: q.SortDirection, - offset: q.Offset, - limit: q.Limit, - }, - } -} - -// universeRootPage is a single page of roots. -type universeRootPage []universe.Root - -// Size is the amount of roots in the page. -func (u universeRootPage) Size() (uint64, error) { - return uint64(len(u)), nil -} - -// rootPageCache is used to store the latest root pages for a given treeID. -type rootPageCache = lru.Cache[rootPageQuery, universeRootPage] - -// atomicRootCache is an atomic pointer to a root cache. -type atomicRootCache = atomic.Pointer[rootPageCache] - -// newAtomicRootCache creates a new atomic root cache. -func newAtomicRootCache() *atomicRootCache { - rootCache := lru.NewCache[rootPageQuery, universeRootPage]( - numCachedProofs, - ) - - var a atomicRootCache - a.Store(rootCache) - - return &a -} - -// rootIndex maps a tree ID to a universe root. -type rootIndex = lnutils.SyncMap[treeID, *universe.Root] - -// atomicRootIndex is an atomic pointer to a root index. -type atomicRootIndex = atomic.Pointer[rootIndex] - -// newAtomicRootIndex creates a new atomic root index. -func newAtomicRootIndex() *atomicRootIndex { - var a atomicRootIndex - a.Store(&rootIndex{}) - - return &a -} - -// rootNodeCache is used to cache the set of active root nodes for the -// multiverse tree. -type rootNodeCache struct { - sync.RWMutex - - rootIndex *atomicRootIndex - - allRoots *atomicRootCache - - *cacheLogger - - // TODO(roasbeef): cache for issuance vs transfer roots? -} - -// newRootNodeCache creates a new root node cache. -func newRootNodeCache() *rootNodeCache { - return &rootNodeCache{ - rootIndex: newAtomicRootIndex(), - allRoots: newAtomicRootCache(), - cacheLogger: newCacheLogger("universe_roots"), - } -} - -// fetchRoots reads the cached roots for the given proof type. If the amounts -// are needed, then we return nothing so we go to the database to fetch the -// information. -func (r *rootNodeCache) fetchRoots(q universe.RootNodesQuery, - haveWriteLock bool) []universe.Root { - - // If we have the write lock already, no need to fetch it. - if !haveWriteLock { - r.RLock() - defer r.RUnlock() - } - - // Attempt to read directly from the root node cache. - rootNodeCache := r.allRoots.Load() - rootNodes, _ := rootNodeCache.Get(newRootPageQuery(q)) - - if len(rootNodes) > 0 { - r.Hit() - } else { - r.Miss() - } - - return rootNodes +// MultiverseStoreConfig is the set of configuration options for the multiverse +// store. +type MultiverseStoreConfig struct { + // Caches is the set of cache configurations for the multiverse store. + Caches MultiverseCacheConfig } -// fetchRoot reads the cached root for the given ID. -func (r *rootNodeCache) fetchRoot(id universe.Identifier) *universe.Root { - rootIndex := r.rootIndex.Load() - - root, ok := rootIndex.Load(treeID(id.String())) - if ok { - r.Hit() - return root +// DefaultMultiverseStoreConfig returns the default configuration for the +// multiverse store. +func DefaultMultiverseStoreConfig() *MultiverseStoreConfig { + return &MultiverseStoreConfig{ + Caches: DefaultMultiverseCacheConfig(), } - - r.Miss() - - return nil -} - -// cacheRoot stores the given root in the cache. -func (r *rootNodeCache) cacheRoot(id universe.Identifier, - root universe.Root) { - - rootIndex := r.rootIndex.Load() - rootIndex.Store(treeID(id.String()), &root) -} - -// cacheRoots stores the given roots in the cache. -func (r *rootNodeCache) cacheRoots(q universe.RootNodesQuery, - rootNodes []universe.Root) { - - log.Debugf("caching num_roots=%v", len(rootNodes)) - - // Store the main root pointer, then update the root index. - rootPageCache := r.allRoots.Load() - _, err := rootPageCache.Put(newRootPageQuery(q), rootNodes) - if err != nil { - log.Errorf("unable to insert into root cache: %v", err) - } - - rootIndex := r.rootIndex.Load() - for _, rootNode := range rootNodes { - rootNode := rootNode - - idStr := treeID(rootNode.ID.String()) - rootIndex.Store(idStr, &rootNode) - } -} - -// wipeCache wipes all the cached roots. -func (r *rootNodeCache) wipeCache() { - log.Debugf("wiping universe cache") - - rootCache := lru.NewCache[rootPageQuery, universeRootPage]( - numCachedProofs, - ) - r.allRoots.Store(rootCache) - - r.rootIndex.Store(&rootIndex{}) -} - -// cachedLeafKeys is used to cache the set of leaf keys for a given universe. -// -// TODO(roasbeef); cacheable[T] -type cachedLeafKeys []universe.LeafKey - -// Size just returns 1, as we cache based on the total number of assets, but -// not the sum of their leaves. -func (c cachedLeafKeys) Size() (uint64, error) { - return uint64(1), nil -} - -// numMaxCachedPages is the maximum number of pages we'll cache for a given -// page cache. Each page is 512 items, so we'll cache 10 of them, up to 5,120 -// for a given namespace. -const numMaxCachedPages = 1000 - -// leafQuery is a wrapper around the existing UniverseLeafKeysQuery struct that -// doesn't include a pointer so it can be safely used as a map key. -type leafQuery struct { - sortDirection universe.SortDirection - offset int32 - limit int32 -} - -// newLeafQuery creates a new leaf query. -func newLeafQuery(q universe.UniverseLeafKeysQuery) leafQuery { - return leafQuery{ - sortDirection: q.SortDirection, - offset: q.Offset, - limit: q.Limit, - } -} - -// leafPageCache caches the various paginated responses for a given treeID. -type leafPageCache struct { - *lru.Cache[leafQuery, *cachedLeafKeys] -} - -// Size returns the number of elements in the leaf page cache. -func (l *leafPageCache) Size() (uint64, error) { - return uint64(l.Len()), nil -} - -// leafKeysCache is used to cache the set of leaf keys for a given universe. -// For each treeID we store an inner cache for the paginated responses. -type leafKeysCache = lru.Cache[treeID, *leafPageCache] - -// universeLeafCaches is used to cache the set of leaf keys for a given -// universe. -type universeLeafCache struct { - sync.Mutex - - leafCache *leafKeysCache - - *cacheLogger -} - -// newUniverseLeafCache creates a new universe leaf cache. -func newUniverseLeafCache() *universeLeafCache { - return &universeLeafCache{ - leafCache: lru.NewCache[treeID, *leafPageCache]( - numCachedProofs, - ), - cacheLogger: newCacheLogger("universe_leaf_keys"), - } -} - -// fetchLeafKeys reads the cached leaf keys for the given ID. -func (u *universeLeafCache) fetchLeafKeys(q universe.UniverseLeafKeysQuery, -) []universe.LeafKey { - - idStr := treeID(q.Id.String()) - - leafPageCache, err := u.leafCache.Get(idStr) - if err == nil { - leafKeys, err := leafPageCache.Get(newLeafQuery(q)) - if err == nil { - u.Hit() - log.Tracef("read leaf keys for %v from cache", - q.Id.StringForLog()) - return *leafKeys - } - } - - u.Miss() - - return nil -} - -// cacheLeafKeys stores the given leaf keys in the cache. -func (u *universeLeafCache) cacheLeafKeys(q universe.UniverseLeafKeysQuery, - keys []universe.LeafKey) { - - cachedKeys := cachedLeafKeys(keys) - - idStr := treeID(q.Id.String()) - - log.Debugf("storing leaf keys for %v in cache", q.Id.StringForLog()) - - pageCache, err := u.leafCache.Get(idStr) - if err != nil { - // No page cache yet, so we'll create one now. - pageCache = &leafPageCache{ - Cache: lru.NewCache[leafQuery, *cachedLeafKeys]( - numMaxCachedPages, - ), - } - - // Store the cache in the top level cache. - if _, err := u.leafCache.Put(idStr, pageCache); err != nil { - // If we encounter an error here, we'll exit to avoid a - // panic below. - log.Errorf("unable to store entry in page cache: %v", - err) - return - } - } - - // Add the to the page cache. - if _, err := pageCache.Put(newLeafQuery(q), &cachedKeys); err != nil { - log.Errorf("unable to store leaf resp: %v", err) - } -} - -// wipeCache wipes the cache of leaf keys for a given universe ID. -func (u *universeLeafCache) wipeCache(id treeID) { - log.Debugf("wiping leaf keys for %x in cache", id) - - u.leafCache.Delete(id) } // MultiverseStore implements the persistent storage for a multiverse. @@ -515,11 +122,15 @@ func (u *universeLeafCache) wipeCache(id treeID) { type MultiverseStore struct { db BatchedMultiverse + cfg *MultiverseStoreConfig + + syncerCache *syncerRootNodeCache + rootNodeCache *rootNodeCache - proofCache *proofCache + proofCache *universeProofCache - leafKeysCache *universeLeafCache + leafKeysCache *universeLeafPageCache // transferProofDistributor is an event distributor that will be used to // notify subscribers about new proof leaves that are added to the @@ -530,12 +141,26 @@ type MultiverseStore struct { } // NewMultiverseStore creates a new multiverse DB store handle. -func NewMultiverseStore(db BatchedMultiverse) *MultiverseStore { +func NewMultiverseStore(db BatchedMultiverse, + cfg *MultiverseStoreConfig) *MultiverseStore { + return &MultiverseStore{ - db: db, - rootNodeCache: newRootNodeCache(), - proofCache: newProofCache(), - leafKeysCache: newUniverseLeafCache(), + db: db, + cfg: cfg, + syncerCache: newSyncerRootNodeCache( + cfg.Caches.SyncerCacheEnabled, + cfg.Caches.SyncerCachePreAllocSize, + ), + rootNodeCache: newRootNodeCache( + cfg.Caches.RootNodePageCacheSize, + ), + proofCache: newUniverseProofCache( + cfg.Caches.ProofsPerUniverse, + ), + leafKeysCache: newUniverseLeafPageCache( + cfg.Caches.LeavesNumCachedUniverses, + cfg.Caches.LeavesPerUniverse, + ), transferProofDistributor: fn.NewEventDistributor[proof.Blob](), } } @@ -608,20 +233,52 @@ func (b *MultiverseStore) MultiverseRootNode(ctx context.Context, func (b *MultiverseStore) UniverseRootNode(ctx context.Context, id universe.Identifier) (universe.Root, error) { - // First, we'll check the root node cache to see if we already have it. - rootNode := b.rootNodeCache.fetchRoot(id) + // For an individual universe root node, we always fetch it from the + // syncer cache, as that should have all root nodes that are currently + // known. We never update the syncer cache on a cache miss of a single + // root node, as that shouldn't happen (unless the cache is empty). + // This will always return nil if the cache is disabled, so we don't + // need an extra indentation for that check here. + rootNode := b.syncerCache.fetchRoot(id) if rootNode != nil { return *rootNode, nil } - b.rootNodeCache.Lock() - defer b.rootNodeCache.Unlock() + // If the cache is still empty, we'll populate it now, given it is + // enabled. + if b.syncerCache.isEmpty() && b.cfg.Caches.SyncerCacheEnabled { + // We attempt to acquire the write lock to fill the cache. If + // another goroutine is already filling the cache, we'll wait + // for it to finish that way. + b.syncerCache.Lock() + defer b.syncerCache.Unlock() + + // Because another goroutine might have filled the cache while + // we were waiting for the lock, we'll check again if the item + // is now in the cache. + rootNode = b.syncerCache.fetchRoot(id) + if rootNode != nil { + return *rootNode, nil + } - // Check to see if the cache was populated while we were waiting for - // the lock. - rootNode = b.rootNodeCache.fetchRoot(id) - if rootNode != nil { - return *rootNode, nil + // Populate the cache with all the root nodes. + err := b.fillSyncerCache(ctx) + if err != nil { + return universe.Root{}, fmt.Errorf("error filling "+ + "syncer cache: %w", err) + } + + // We now try again to fetch the root node from the cache. + rootNode = b.syncerCache.fetchRoot(id) + if rootNode != nil { + return *rootNode, nil + } + + // Still no luck with the cache (this should really never + // happen), so we'll go to the secondary cache or the disk to + // fetch it. + log.Warnf("Fetching root node from disk for id %v, cache miss "+ + "even after filling the cache", id) } var universeRoot UniverseRoot @@ -658,8 +315,6 @@ func (b *MultiverseStore) UniverseRootNode(ctx context.Context, AssetName: universeRoot.AssetName, } - b.rootNodeCache.cacheRoot(id, dbRoot) - return dbRoot, nil } @@ -673,7 +328,7 @@ func (b *MultiverseStore) UniverseLeafKeys(ctx context.Context, return leafKeys, nil } - // The leaves wasn't populated, so we'll go to disk to fetch it. + // The leaves weren't populated, so we'll go to disk to fetch it. b.leafKeysCache.Lock() defer b.leafKeysCache.Unlock() @@ -710,7 +365,62 @@ func (b *MultiverseStore) UniverseLeafKeys(ctx context.Context, func (b *MultiverseStore) RootNodes(ctx context.Context, q universe.RootNodesQuery) ([]universe.Root, error) { - // Attempt to read directly from the root node cache. + // Is this a query for the syncer cache (ascending and + // WithAmountsById=false)? This cache is complete (all root nodes) and + // can be directly sliced into, but it doesn't have any amounts by ID + // and doesn't support descending order. + if isQueryForSyncerCache(q) && b.cfg.Caches.SyncerCacheEnabled { + // First, check to see if we have the root nodes cached in the + // syncer cache. + rootNodes, emptyPage := b.syncerCache.fetchRoots(q, false) + if len(rootNodes) > 0 || emptyPage { + return rootNodes, nil + } + + // If the cache is still empty, we'll populate it now. + if b.syncerCache.isEmpty() { + // We attempt to acquire the write lock to fill the + // cache. If another goroutine is already filling the + // cache, we'll wait for it to finish that way. + b.syncerCache.Lock() + defer b.syncerCache.Unlock() + + // Because another goroutine might have filled the cache + // while we were waiting for the lock, we'll check again + // if the item is now in the cache. + rootNodes, emptyPage = b.syncerCache.fetchRoots(q, true) + if len(rootNodes) > 0 || emptyPage { + return rootNodes, nil + } + + // Populate the cache with all the root nodes. + err := b.fillSyncerCache(ctx) + if err != nil { + return nil, fmt.Errorf("error filling syncer "+ + "cache: %w", err) + } + + // We now try again to fetch the root nodes page from + // the cache. + rootNodes, emptyPage = b.syncerCache.fetchRoots(q, true) + if len(rootNodes) > 0 || emptyPage { + return rootNodes, nil + } + + // Still no luck with the cache (this should really + // never happen), so we'll go to the secondary cache or + // disk to fetch it. + log.Warnf("Fetching root nodes page from disk for "+ + "query %v, cache miss even after filling the "+ + "cache", q) + } + } + + // Attempt to read directly from the root node cache next. This + // secondary cache only contains the last few pages of root nodes that + // were queried with parameters the syncer doesn't use. This might serve + // some UI requests where the first few pages are queried multiple + // times, so an LRU based cache that's smaller makes sense.. rootNodes := b.rootNodeCache.fetchRoots(q, false) if len(rootNodes) > 0 { log.Tracef("read %d root nodes from cache", len(rootNodes)) @@ -731,11 +441,6 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, now := time.Now() log.Infof("populating root cache...") - var ( - uniRoots []universe.Root - readTx = NewBaseMultiverseReadTx() - ) - params := sqlc.UniverseRootsParams{ SortDirection: sqlInt16(q.SortDirection), NumOffset: q.Offset, @@ -747,7 +452,29 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, return q.Limit }(), } + uniRoots, err := b.queryRootNodes(ctx, params, q.WithAmountsById) + if err != nil { + return nil, err + } + log.Debugf("Populating %v root nodes into cache, took=%v", + len(uniRoots), time.Since(now)) + + // Cache all the root nodes we just read from the database. + b.rootNodeCache.cacheRoots(q, uniRoots) + + return uniRoots, nil +} + +// queryRootNodes returns the set of root nodes for the given query parameters. +func (b *MultiverseStore) queryRootNodes(ctx context.Context, + params sqlc.UniverseRootsParams, + withAmountsByID bool) ([]universe.Root, error) { + + var ( + uniRoots []universe.Root + readTx = NewBaseMultiverseReadTx() + ) dbErr := b.db.ExecTx(ctx, &readTx, func(db BaseMultiverseStore) error { dbRoots, err := db.UniverseRoots(ctx, params) if err != nil { @@ -785,7 +512,7 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, // We skip the grouped assets if that wasn't explicitly // requested by the user, saves us some calls for // grouped assets. - if dbRoot.GroupKey != nil && q.WithAmountsById { + if dbRoot.GroupKey != nil && withAmountsByID { groupLeaves, err := db.QueryUniverseLeaves( ctx, UniverseLeafQuery{ Namespace: id.String(), @@ -803,7 +530,7 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, copy(id[:], leaf.AssetID) groupedAssets[id] = uint64(leaf.SumAmt) } - } else if q.WithAmountsById { + } else if withAmountsByID { // For non-grouped assets, there's exactly one // member, the asset itself. groupedAssets = map[asset.ID]uint64{ @@ -831,13 +558,47 @@ func (b *MultiverseStore) RootNodes(ctx context.Context, return nil, dbErr } - log.Debugf("Populating %v root nodes into cache, took=%v", - len(uniRoots), time.Since(now)) + return uniRoots, nil +} - // Cache all the root nodes we just read from the database. - b.rootNodeCache.cacheRoots(q, uniRoots) +// fillSyncerCache populates the syncer cache with all the root nodes that are +// currently known. This is used to quickly serve the syncer with the root nodes +// it needs to sync the multiverse. +// +// NOTE: This method must be called while holding the syncer cache lock. +func (b *MultiverseStore) fillSyncerCache(ctx context.Context) error { + now := time.Now() + log.Infof("Populating syncer root cache...") - return uniRoots, nil + params := sqlc.UniverseRootsParams{ + SortDirection: sqlInt16(universe.SortAscending), + NumOffset: 0, + NumLimit: universe.MaxPageSize, + } + + allRoots := make( + []universe.Root, 0, b.cfg.Caches.SyncerCachePreAllocSize, + ) + for { + newRoots, err := b.queryRootNodes(ctx, params, false) + if err != nil { + return err + } + + allRoots = append(allRoots, newRoots...) + params.NumOffset += universe.MaxPageSize + + if len(newRoots) < universe.MaxPageSize { + break + } + } + + log.Debugf("Populating %v root nodes into syncer cache, took=%v", + len(allRoots), time.Since(now)) + + b.syncerCache.replaceCache(allRoots) + + return nil } // FetchProofLeaf returns a proof leaf for the target key. If the key doesn't @@ -849,9 +610,9 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, universeKey universe.LeafKey) ([]*universe.Proof, error) { // First, check the cached to see if we already have this proof. - proof := b.proofCache.fetchProof(id, universeKey) - if len(proof) > 0 { - return proof, nil + proofsFromCache := b.proofCache.fetchProof(id, universeKey) + if len(proofsFromCache) > 0 { + return proofsFromCache, nil } var ( @@ -864,11 +625,9 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, return nil, err } - dbErr := b.db.ExecTx(ctx, &readTx, func(dbTx BaseMultiverseStore) error { + dbErr := b.db.ExecTx(ctx, &readTx, func(tx BaseMultiverseStore) error { var err error - proofs, err = universeFetchProofLeaf( - ctx, id, universeKey, dbTx, - ) + proofs, err = universeFetchProofLeaf(ctx, id, universeKey, tx) if err != nil { return err } @@ -877,7 +636,7 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, // // Retrieve a handle to the multiverse MS-SMT tree. multiverseTree := mssmt.NewCompactedTree( - newTreeStoreWrapperTx(dbTx, multiverseNS), + newTreeStoreWrapperTx(tx, multiverseNS), ) multiverseRoot, err := multiverseTree.Root(ctx) @@ -900,6 +659,8 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, for i := range proofs { proofs[i].MultiverseRoot = multiverseRoot + + //nolint:lll proofs[i].MultiverseInclusionProof = multiverseInclusionProof } @@ -909,8 +670,8 @@ func (b *MultiverseStore) FetchProofLeaf(ctx context.Context, return nil, dbErr } - // Insert the proof we just read up into the main cache. - b.proofCache.insertProof(id, universeKey, proofs) + // Insert the proofs we just read up into the main cache. + b.proofCache.insertProofs(id, universeKey, proofs) return proofs, nil } @@ -992,8 +753,7 @@ func (b *MultiverseStore) FetchProof(ctx context.Context, // UpsertProofLeaf upserts a proof leaf within the multiverse tree and the // universe tree that corresponds to the given key. func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context, - id universe.Identifier, key universe.LeafKey, - leaf *universe.Leaf, + id universe.Identifier, key universe.LeafKey, leaf *universe.Leaf, metaReveal *proof.MetaReveal) (*universe.Proof, error) { var ( @@ -1019,12 +779,15 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context, return nil, dbErr } - idStr := treeID(id.String()) - // Invalidate the cache since we just updated the root. b.rootNodeCache.wipeCache() b.proofCache.delProofsForAsset(id) - b.leafKeysCache.wipeCache(idStr) + b.leafKeysCache.wipeCache(id.String()) + b.syncerCache.addOrReplace(universe.Root{ + ID: id, + AssetName: leaf.Asset.Tag, + Node: issuanceProof.MultiverseRoot, + }) // Notify subscribers about the new proof leaf, now that we're sure we // have written it to the database. But we only care about transfer @@ -1043,30 +806,31 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context, items []*universe.Item) error { insertProof := func(item *universe.Item, - dbTx BaseMultiverseStore) error { + dbTx BaseMultiverseStore) (*universe.Proof, error) { // Upsert proof leaf into the asset (group) specific universe // tree. - _, err := universeUpsertProofLeaf( + return universeUpsertProofLeaf( ctx, dbTx, item.ID, item.Key, item.Leaf, item.MetaReveal, ) - if err != nil { - return err - } - - return nil } - var writeTx BaseMultiverseOptions + var ( + writeTx BaseMultiverseOptions + uniProofs []*universe.Proof + ) dbErr := b.db.ExecTx( ctx, &writeTx, func(store BaseMultiverseStore) error { + uniProofs = make([]*universe.Proof, len(items)) for idx := range items { item := items[idx] - err := insertProof(item, store) + uniProof, err := insertProof(item, store) if err != nil { return err } + + uniProofs[idx] = uniProof } return nil @@ -1090,12 +854,21 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context, items[idx].Leaf.RawProof, ) } + + // Update the syncer cache with the new root node. + b.syncerCache.addOrReplace(universe.Root{ + ID: items[idx].ID, + AssetName: items[idx].Leaf.Asset.Tag, + Node: uniProofs[idx].UniverseRoot, + }) } // Invalidate the root node cache for all the assets we just inserted. - idsToDelete := fn.NewSet(fn.Map(items, func(item *universe.Item) treeID { - return treeID(item.ID.String()) - })...) + idsToDelete := fn.NewSet( + fn.Map(items, func(item *universe.Item) universeIDKey { + return item.ID.String() + })..., + ) for id := range idsToDelete { b.proofCache.Delete(id) @@ -1111,14 +884,14 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context, var writeTx BaseUniverseStoreOptions - dbErr := b.db.ExecTx(ctx, &writeTx, func(dbTx BaseMultiverseStore) error { + dbErr := b.db.ExecTx(ctx, &writeTx, func(tx BaseMultiverseStore) error { multiverseNS, err := namespaceForProof(id.ProofType) if err != nil { return err } multiverseTree := mssmt.NewCompactedTree( - newTreeStoreWrapperTx(dbTx, multiverseNS), + newTreeStoreWrapperTx(tx, multiverseNS), ) multiverseLeafKey := id.Bytes() @@ -1127,7 +900,7 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context, return err } - return deleteUniverseTree(ctx, dbTx, id) + return deleteUniverseTree(ctx, tx, id) }) if dbErr != nil { return "", dbErr @@ -1136,9 +909,9 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context, // Wipe the cache items from this node. b.rootNodeCache.wipeCache() - idStr := treeID(id.String()) - b.proofCache.Delete(idStr) - b.leafKeysCache.wipeCache(idStr) + b.proofCache.Delete(id.String()) + b.leafKeysCache.wipeCache(id.String()) + b.syncerCache.remove(id.Key()) return id.String(), dbErr } diff --git a/tapdb/multiverse_cache.go b/tapdb/multiverse_cache.go new file mode 100644 index 000000000..60bdd9550 --- /dev/null +++ b/tapdb/multiverse_cache.go @@ -0,0 +1,698 @@ +package tapdb + +import ( + "bytes" + "crypto/sha256" + "slices" + "sort" + "sync" + "sync/atomic" + + "github.com/lightninglabs/neutrino/cache/lru" + "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/universe" + "github.com/lightningnetwork/lnd/lnutils" +) + +// MultiverseCacheConfig is the configuration for the different multiverse +// caches that exist. +// +//nolint:lll +type MultiverseCacheConfig struct { + // ProofsPerUniverse is the number of proofs that are cached per + // universe. This number needs to be multiplied by the total number of + // universes to get the total number of proofs that are cached. There is + // no limit to the number of universes that can hold cached keys, so a + // cache is created for each universe that receives a request. + ProofsPerUniverse uint64 `long:"proofs-per-universe" description:"The number of proofs that are cached per universe."` + + // LeavesNumCachedUniverses is the number of universes that can have a + // cache of leaf keys. Each cached universe can have up to + // LeavesPerUniverse keys cached. The total number of cached keys is + // therefore LeavesNumCachedUniverses * LeavesPerUniverse. + LeavesNumCachedUniverses uint64 `long:"leaves-num-cached-universes" description:"The number of universes that can have a cache of leaf keys."` + + // LeavesPerUniverse is the number of leaf keys that are cached per + // universe. This number needs to be multiplied by + // LeavesNumCachedUniverses to get the total number of leaf keys that + // are cached. + LeavesPerUniverse uint64 `long:"leaves-per-universe" description:"The number of leaf keys that are cached per cached universe."` + + // SyncerCacheEnabled is a flag that indicates if the syncer cache is + // enabled. The syncer cache is used to cache the set of active root + // nodes for the multiverse tree, which is specifically kept for the + // universe sync. + SyncerCacheEnabled bool `long:"syncer-cache-enabled" description:"If the syncer cache is enabled."` + + // SyncerCachePreAllocSize is the pre-allocated size of the syncer + // cache. + SyncerCachePreAllocSize uint64 `long:"syncer-cache-pre-alloc-size" description:"The pre-allocated size of the syncer cache."` + + // RootNodePageCacheSize is the size of the root node page cache that + // serves all paginated queries for root nodes that use different + // parameters than the syncer cache. + RootNodePageCacheSize uint64 `long:"root-node-page-cache-size" description:"The size of the root node page cache for all requests that aren't served by the syncer cache."` +} + +// DefaultMultiverseCacheConfig returns the default configuration for the +// multiverse cache. +func DefaultMultiverseCacheConfig() MultiverseCacheConfig { + return MultiverseCacheConfig{ + ProofsPerUniverse: 5, + LeavesNumCachedUniverses: 2_000, + LeavesPerUniverse: 50, + SyncerCacheEnabled: false, + SyncerCachePreAllocSize: 100_000, + RootNodePageCacheSize: 20 * universe.MaxPageSize, + } +} + +// ProofKey is used to uniquely identify a proof within a universe. This is +// used for the LRU cache for the proofs themselves, which are considered to be +// immutable. +type ProofKey [32]byte + +// NewProofKey takes a universe identifier and leaf key, and returns a proof +// key. +func NewProofKey(id universe.Identifier, key universe.LeafKey) ProofKey { + idBytes := id.Bytes() + leafKeyBytes := key.UniverseKey() + + // The proof key maps down the ID and the leaf key into a single + // 32-byte value: sha256(id || leaf_key).. + h := sha256.New() + h.Write(idBytes[:]) + h.Write(leafKeyBytes[:]) + + return fn.ToArray[ProofKey](h.Sum(nil)) +} + +// cachedProofs is a list of cached proof leaves. +type cachedProofs []*universe.Proof + +// Size just returns 1 as we're limiting based on the total number different +// leaf keys we query by. So we might store more than one proof per cache entry +// if the universe key's script key isn't set. But we only want a certain number +// of different keys stored in the cache. +func (c *cachedProofs) Size() (uint64, error) { + return 1, nil +} + +// newProofCache creates a new leaf proof cache. +func newProofCache(proofCacheSize uint64) *lru.Cache[ProofKey, *cachedProofs] { + return lru.NewCache[ProofKey, *cachedProofs](proofCacheSize) +} + +// universeIDKey is a cache key that is used to uniquely identify a universe +// within a multiverse tree cache. +type universeIDKey = string + +// universeProofCache a map of proof caches for each proof type. +type universeProofCache struct { + proofsPerUniverse uint64 + + lnutils.SyncMap[universeIDKey, *lru.Cache[ProofKey, *cachedProofs]] + + *cacheLogger +} + +// newUniverseProofCache creates a new proof cache. +func newUniverseProofCache(proofsPerUniverse uint64) *universeProofCache { + return &universeProofCache{ + proofsPerUniverse: proofsPerUniverse, + SyncMap: lnutils.SyncMap[ + universeIDKey, *lru.Cache[ProofKey, *cachedProofs], + ]{}, + cacheLogger: newCacheLogger("universe_proofs"), + } +} + +// fetchProof reads the cached proof for the given ID and leaf key. +func (p *universeProofCache) fetchProof(id universe.Identifier, + leafKey universe.LeafKey) []*universe.Proof { + + // First, get the sub-cache for this universe ID from the map of + // caches. + assetProofCache, _ := p.LoadOrStore( + id.String(), newProofCache(p.proofsPerUniverse), + ) + + // With that lower level cache obtained, we can check to see if we have + // a hit or not. + proofKey := NewProofKey(id, leafKey) + proofFromCache, err := assetProofCache.Get(proofKey) + if err == nil { + p.Hit() + return *proofFromCache + } + + p.Miss() + + return nil +} + +// insertProofs inserts the given proofs into the cache. +func (p *universeProofCache) insertProofs(id universe.Identifier, + leafKey universe.LeafKey, proof []*universe.Proof) { + + assetProofCache, _ := p.LoadOrStore( + id.String(), newProofCache(p.proofsPerUniverse), + ) + + proofKey := NewProofKey(id, leafKey) + + log.Debugf("storing proof for %v+%v in cache, key=%x", + id.StringForLog(), leafKey, proofKey[:]) + + proofVal := cachedProofs(proof) + if _, err := assetProofCache.Put(proofKey, &proofVal); err != nil { + log.Errorf("unable to insert into proof cache: %v", err) + } +} + +// delProofsForAsset deletes all the proofs for the given asset. +func (p *universeProofCache) delProofsForAsset(id universe.Identifier) { + log.Debugf("wiping proofs for %v from cache", id) + + p.Delete(id.String()) +} + +// rootPageQueryKey is a cache key that wraps around a query to fetch all the +// roots, but with pagination parameters. +type rootPageQueryKey struct { + withAmountsById bool + leafQueryKey +} + +// newRootPageQuery creates a new root page query. +func newRootPageQuery(q universe.RootNodesQuery) rootPageQueryKey { + return rootPageQueryKey{ + withAmountsById: q.WithAmountsById, + leafQueryKey: leafQueryKey{ + sortDirection: q.SortDirection, + offset: q.Offset, + limit: q.Limit, + }, + } +} + +// universeRootPage is a single page of roots. +type universeRootPage []universe.Root + +// Size is the amount of roots in the page. +func (u universeRootPage) Size() (uint64, error) { + return uint64(len(u)), nil +} + +// rootPageCache is used to store the latest root pages for a given +// universeIDKey. +type rootPageCache struct { + atomic.Pointer[lru.Cache[rootPageQueryKey, universeRootPage]] +} + +// newRootPageCache creates a new atomic root cache. +func newRootPageCache(cacheSize uint64) *rootPageCache { + var cache rootPageCache + cache.wipe(cacheSize) + + return &cache +} + +// wipe wipes the cache. +func (r *rootPageCache) wipe(cacheSize uint64) { + rootCache := lru.NewCache[rootPageQueryKey, universeRootPage](cacheSize) + r.Store(rootCache) +} + +// rootIndex maps a tree ID to a universe root. +type rootIndex struct { + atomic.Pointer[lnutils.SyncMap[universeIDKey, *universe.Root]] +} + +// newRootIndex creates a new atomic root index. +func newRootIndex() *rootIndex { + var a rootIndex + a.wipe() + + return &a +} + +// wipe wipes the cache. +func (r *rootIndex) wipe() { + var idx lnutils.SyncMap[universeIDKey, *universe.Root] + r.Store(&idx) +} + +// syncerRootNodeCache is used to cache the set of active root nodes for the +// multiverse tree, which is specifically kept for the universe sync. +type syncerRootNodeCache struct { + sync.RWMutex + + // enabled is a flag that indicates if the cache is enabled. + enabled bool + + // preAllocSize is the pre-allocated size of the cache. + preAllocSize uint64 + + // universeKeyList is the list of all keys of the universes that are + // currently known in this multiverse. This slice is sorted by the key + // (which might differ from the database ordering) and can be used to + // paginate through the universes efficiently. The universe identifier + // key also contains the proof type, so there will be two entries + // per asset ID or group key, one for issuance and one for transfer + // universes. The universe roots map below will contain a root for each + // of these universes. The stable sort order allows us to add and remove + // entries without needing to fetch the entire list from the database, + // as remove can be done by binary search and add can be done by + // inserting and then sorting again. + universeKeyList []universe.IdentifierKey + + // universeRoots is a map of universe ID key to the root of the + // universe. This map is needed to look up the root of an individual + // universe quickly, or to look up the roots of all the universes when + // paging through them. This map never needs to be cleared, as universe + // roots are only added (new issuance or transfer events) or modified + // (re-issuance into grouped asset or new transfer) but rarely deleted. + // Therefore, we never need to do a full wipe of the cache. + universeRoots map[universe.IdentifierKey]universe.Root + + *cacheLogger +} + +// newSyncerRootNodeCache creates a new root node cache. +func newSyncerRootNodeCache(enabled bool, + preAllocSize uint64) *syncerRootNodeCache { + + rootsMap := make(map[universe.IdentifierKey]universe.Root) + if enabled { + rootsMap = make( + map[universe.IdentifierKey]universe.Root, preAllocSize, + ) + } + + return &syncerRootNodeCache{ + preAllocSize: preAllocSize, + universeRoots: rootsMap, + cacheLogger: newCacheLogger("syncer_universe_roots"), + } +} + +// isQueryForSyncerCache returns true if the given query can be served from the +// syncer cache. We explicitly only cache the syncer queries in the syncer +// cache, which _always_ queries with sort direction "ascending" and no amounts +// by ID. For any other query, we'll go to the LRU based secondary cache or the +// database. +func isQueryForSyncerCache(q universe.RootNodesQuery) bool { + if q.WithAmountsById || q.SortDirection != universe.SortAscending { + return false + } + + return true +} + +// fetchRoots reads the cached roots for the given proof type. If the amounts +// are needed, then we return nothing so we go to the database to fetch the +// information. The boolean indicates if there are more roots available because +// the caller has reached the end of the list with the given offset. +func (r *syncerRootNodeCache) fetchRoots(q universe.RootNodesQuery, + haveWriteLock bool) ([]universe.Root, bool) { + + // We shouldn't be called for a query that can't be served from the + // cache. But in case we are, we'll just short-cut here. + if !isQueryForSyncerCache(q) || !r.enabled { + return nil, false + } + + // If we've acquired the write lock because we're doing a last lookup + // before potentially populating the cache, we don't need to acquire the + // read lock. If we're just normally reading from the cache, we'll need + // to acquire the read lock. + if !haveWriteLock { + r.RLock() + defer r.RUnlock() + } + + // If the cache is empty, we'll short-cut as well. + if len(r.universeRoots) == 0 { + // This is a miss, but we'll return nil to indicate that we + // don't have any roots. + r.Miss() + + return nil, false + } + + offset := q.Offset + limit := q.Limit + + // Is the page valid? + if offset < 0 || limit <= 0 { + log.Warnf("Invalid page query for syncer cache: offset=%v, "+ + "limit=%v", offset, limit) + + return nil, false + } + + // Because the cache is not empty, and we know it should contain all + // roots, we know the caller has reached the end of the list when their + // offset is larger than the number of roots. Since this is a "legal" + // query (how else would they know they're at the end?), we'll return + // an empty list and a boolean to indicate that there are no more roots. + if offset >= int32(len(r.universeRoots)) { + return nil, true + } + + endIndex := offset + limit + if endIndex > int32(len(r.universeRoots)) { + endIndex = int32(len(r.universeRoots)) + } + + rootNodeIDs := r.universeKeyList[offset:endIndex] + rootNodes := make([]universe.Root, len(rootNodeIDs)) + for idx, id := range rootNodeIDs { + root, ok := r.universeRoots[id] + if !ok { + // This should never happen, the two maps should be in + // sync. + log.Errorf("Root key %x found in cache list but not "+ + "in map", id[:]) + r.Miss() + + return nil, false + } + + rootNodes[idx] = root + } + + // This was a cache hit. + r.Hit() + + return rootNodes, false +} + +// fetchRoot reads the cached root for the given ID. +func (r *syncerRootNodeCache) fetchRoot(id universe.Identifier) *universe.Root { + if !r.enabled { + return nil + } + + r.RLock() + defer r.RUnlock() + + root, ok := r.universeRoots[id.Key()] + if !ok { + r.Miss() + + return nil + } + + r.Hit() + return &root +} + +// sortKeys sorts the universe key list. +// +// NOTE: This method must be called while holding the syncer cache lock. +func (r *syncerRootNodeCache) sortKeys() { + // To make sure we can easily add and remove entries, we sort the + // universe list by the key. This order will be different from the + // database order, but that's fine. + sort.Slice(r.universeKeyList, func(i, j int) bool { + return bytes.Compare( + r.universeKeyList[i][:], r.universeKeyList[j][:], + ) < 0 + }) +} + +// replaceCache replaces the cache with the given roots. +// +// NOTE: This method must be called while holding the syncer cache lock. +func (r *syncerRootNodeCache) replaceCache(newRoots []universe.Root) { + if !r.enabled { + return + } + + r.universeKeyList = make([]universe.IdentifierKey, len(newRoots)) + for idx, root := range newRoots { + r.universeKeyList[idx] = root.ID.Key() + r.universeRoots[root.ID.Key()] = root + } + + r.sortKeys() +} + +// addOrReplace adds a single root to the cache if it isn't already present or +// replaces the existing value if it is. +func (r *syncerRootNodeCache) addOrReplace(root universe.Root) { + if !r.enabled { + return + } + + r.Lock() + defer r.Unlock() + + if _, ok := r.universeRoots[root.ID.Key()]; ok { + // If the root is already in the cache, we'll just replace it in + // the map. The key list doesn't need to be updated, as the key + // never changes. + r.universeRoots[root.ID.Key()] = root + + return + } + + r.universeKeyList = append(r.universeKeyList, root.ID.Key()) + r.universeRoots[root.ID.Key()] = root + + r.sortKeys() +} + +// remove removes a single root from the cache. +func (r *syncerRootNodeCache) remove(key universe.IdentifierKey) { + if !r.enabled { + return + } + + r.Lock() + defer r.Unlock() + + idx := sort.Search(len(r.universeKeyList), func(i int) bool { + return bytes.Compare(r.universeKeyList[i][:], key[:]) >= 0 + }) + if idx < len(r.universeKeyList) && r.universeKeyList[idx] == key { + // Remove the entry from the list. + r.universeKeyList = slices.Delete(r.universeKeyList, idx, idx+1) + + // Remove the entry from the map. + delete(r.universeRoots, key) + } +} + +// isEmpty returns true if the cache is empty. +func (r *syncerRootNodeCache) isEmpty() bool { + r.RLock() + defer r.RUnlock() + + return len(r.universeKeyList) == 0 +} + +// rootNodeCache is used to cache the set of active root nodes for the +// multiverse tree. +type rootNodeCache struct { + sync.RWMutex + + cacheSize uint64 + + rootIndex *rootIndex + + allRoots *rootPageCache + + *cacheLogger + + // TODO(roasbeef): cache for issuance vs transfer roots? +} + +// newRootNodeCache creates a new root node cache. +func newRootNodeCache(cacheSize uint64) *rootNodeCache { + return &rootNodeCache{ + cacheSize: cacheSize, + rootIndex: newRootIndex(), + allRoots: newRootPageCache(cacheSize), + cacheLogger: newCacheLogger("universe_roots"), + } +} + +// fetchRoots reads the cached roots for the given proof type. If the amounts +// are needed, then we return nothing so we go to the database to fetch the +// information. +func (r *rootNodeCache) fetchRoots(q universe.RootNodesQuery, + haveWriteLock bool) []universe.Root { + + // If we have the write lock already, no need to fetch it. + if !haveWriteLock { + r.RLock() + defer r.RUnlock() + } + + // Attempt to read directly from the root node cache. + rootNodeCache := r.allRoots.Load() + rootNodes, _ := rootNodeCache.Get(newRootPageQuery(q)) + + if len(rootNodes) > 0 { + r.Hit() + } else { + r.Miss() + } + + return rootNodes +} + +// cacheRoots stores the given roots in the cache. +func (r *rootNodeCache) cacheRoots(q universe.RootNodesQuery, + rootNodes []universe.Root) { + + log.Debugf("caching num_roots=%v", len(rootNodes)) + + // Store the main root pointer, then update the root index. + rootPageCache := r.allRoots.Load() + _, err := rootPageCache.Put(newRootPageQuery(q), rootNodes) + if err != nil { + log.Errorf("unable to insert into root cache: %v", err) + } + + rootIndex := r.rootIndex.Load() + for _, rootNode := range rootNodes { + rootNode := rootNode + rootIndex.Store(rootNode.ID.String(), &rootNode) + } +} + +// wipeCache wipes all the cached roots. +func (r *rootNodeCache) wipeCache() { + log.Debugf("wiping universe cache") + + r.allRoots.wipe(r.cacheSize) + r.rootIndex.wipe() +} + +// cachedLeafKeys is used to cache the set of leaf keys for a given universe. +type cachedLeafKeys []universe.LeafKey + +// Size just returns 1, as we cache based on the total number of assets, but +// not the sum of their leaves. +func (c cachedLeafKeys) Size() (uint64, error) { + return uint64(1), nil +} + +// leafQueryKey is a wrapper around the existing UniverseLeafKeysQuery struct +// that doesn't include a pointer so it can be safely used as a map key. +type leafQueryKey struct { + sortDirection universe.SortDirection + offset int32 + limit int32 +} + +// newLeafQuery creates a new leaf query. +func newLeafQuery(q universe.UniverseLeafKeysQuery) leafQueryKey { + return leafQueryKey{ + sortDirection: q.SortDirection, + offset: q.Offset, + limit: q.Limit, + } +} + +// leafPageCache caches the various paginated responses for a given +// universeIDKey. +type leafPageCache struct { + *lru.Cache[leafQueryKey, *cachedLeafKeys] +} + +// Size returns the number of elements in the leaf page cache. +func (l *leafPageCache) Size() (uint64, error) { + return uint64(l.Len()), nil +} + +// universeLeafCaches is used to cache the set of leaf keys for a given +// universe. +type universeLeafPageCache struct { + sync.Mutex + + leavesPerUniverse uint64 + + leafCache *lru.Cache[universeIDKey, *leafPageCache] + + *cacheLogger +} + +// newUniverseLeafPageCache creates a new universe leaf cache. +func newUniverseLeafPageCache(numCachedUniverses, + leavesPerUniverse uint64) *universeLeafPageCache { + + return &universeLeafPageCache{ + leavesPerUniverse: leavesPerUniverse, + leafCache: lru.NewCache[universeIDKey, *leafPageCache]( + numCachedUniverses, + ), + cacheLogger: newCacheLogger("universe_leaf_keys"), + } +} + +// fetchLeafKeys reads the cached leaf keys for the given ID. +func (u *universeLeafPageCache) fetchLeafKeys( + q universe.UniverseLeafKeysQuery) []universe.LeafKey { + + leafPageCache, err := u.leafCache.Get(q.Id.String()) + if err == nil { + leafKeys, err := leafPageCache.Get(newLeafQuery(q)) + if err == nil { + u.Hit() + log.Tracef("read leaf keys for %v from cache", + q.Id.StringForLog()) + return *leafKeys + } + } + + u.Miss() + + return nil +} + +// cacheLeafKeys stores the given leaf keys in the cache. +func (u *universeLeafPageCache) cacheLeafKeys(q universe.UniverseLeafKeysQuery, + keys []universe.LeafKey) { + + cachedKeys := cachedLeafKeys(keys) + + idStr := q.Id.String() + + log.Debugf("storing leaf keys for %v in cache", q.Id.StringForLog()) + + pageCache, err := u.leafCache.Get(idStr) + if err != nil { + // No page cache yet, so we'll create one now. + pageCache = &leafPageCache{ + Cache: lru.NewCache[leafQueryKey, *cachedLeafKeys]( + u.leavesPerUniverse, + ), + } + + // Store the cache in the top level cache. + if _, err := u.leafCache.Put(idStr, pageCache); err != nil { + // If we encounter an error here, we'll exit to avoid a + // panic below. + log.Errorf("unable to store entry in page cache: %v", + err) + return + } + } + + // Add the to the page cache. + if _, err := pageCache.Put(newLeafQuery(q), &cachedKeys); err != nil { + log.Errorf("unable to store leaf resp: %v", err) + } +} + +// wipeCache wipes the cache of leaf keys for a given universe ID. +func (u *universeLeafPageCache) wipeCache(id universeIDKey) { + log.Debugf("wiping leaf keys for %s in cache", id) + + u.leafCache.Delete(id) +} diff --git a/tapdb/multiverse_cache_test.go b/tapdb/multiverse_cache_test.go new file mode 100644 index 000000000..dd3e9bd13 --- /dev/null +++ b/tapdb/multiverse_cache_test.go @@ -0,0 +1,277 @@ +package tapdb + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/internal/test" + "github.com/lightninglabs/taproot-assets/mssmt" + "github.com/lightninglabs/taproot-assets/universe" + "github.com/stretchr/testify/require" +) + +// TestMultiverseRootsCachePerformance tests the cache hit vs. miss ratio of the +// multiverse store's root node cache. +func TestMultiverseRootsCachePerformance(t *testing.T) { + ctx := context.Background() + multiverse, _ := newTestMultiverse(t) + + // We insert many assets into the multiverse store. + const numAssets = 300 + var ( + batch []*universe.Item + allLeaves []*universe.Item + ) + for i := 0; i < numAssets; i++ { + leaf := genRandomAsset(t) + batch = append(batch, leaf) + allLeaves = append(allLeaves, leaf) + + if i != 0 && (i+1)%100 == 0 { + err := multiverse.UpsertProofLeafBatch(ctx, batch) + require.NoError(t, err) + + t.Logf("Inserted %d assets", i+1) + batch = nil + } + } + + // Let's fetch all roots. The cache should be completely empty at this + // point, so we should have all misses. + const pageSize = 64 + roots := queryRoots(t, multiverse, pageSize) + require.Len(t, roots, numAssets) + assertAllLeavesInRoots(t, allLeaves, roots) + + // We need to round up, since we need to make another query for the + // remainder. And the way the cache query is built (query, if not found, + // acquire lock, query again), we always make two queries for each page. + numMisses := ((numAssets / pageSize) + 1) * 2 + require.EqualValues(t, 0, multiverse.rootNodeCache.hit.Load()) + require.EqualValues( + t, numMisses, multiverse.rootNodeCache.miss.Load(), + ) + + // Now we'll fetch all assets again, this should be a cache hit for all + // of them. + roots = queryRoots(t, multiverse, pageSize) + require.Len(t, roots, numAssets) + assertAllLeavesInRoots(t, allLeaves, roots) + + numHits := (numAssets / pageSize) + 1 + require.EqualValues(t, numHits, multiverse.rootNodeCache.hit.Load()) + require.EqualValues( + t, numMisses, multiverse.rootNodeCache.miss.Load(), + ) + + // We now turn on the syncer proof and make sure all our queries are + // served by that cache. + multiverse.syncerCache.enabled = true + multiverse.cfg.Caches.SyncerCacheEnabled = true + roots = queryRoots(t, multiverse, pageSize) + require.Len(t, roots, numAssets) + assertAllLeavesInRoots(t, allLeaves, roots) + + // The old page based cache should still have exactly the same numbers + // as before. + require.EqualValues(t, numHits, multiverse.rootNodeCache.hit.Load()) + require.EqualValues( + t, numMisses, multiverse.rootNodeCache.miss.Load(), + ) + + // The new syncer cache should only have two misses, one from when the + // cache was empty, one from after acquiring the write lock and all + // other queries should be hits. + require.EqualValues(t, 2, multiverse.syncerCache.miss.Load()) + require.EqualValues(t, numHits, multiverse.syncerCache.hit.Load()) +} + +// TestMultiverseSyncerCache tests the syncer cache of the multiverse store. +func TestMultiverseSyncerCache(t *testing.T) { + ctx := context.Background() + multiverse, _ := newTestMultiverse(t) + multiverse.syncerCache.enabled = true + multiverse.cfg.Caches.SyncerCacheEnabled = true + + // We insert a couple of assets into the multiverse store. + const ( + numAssets = 50 + pageSize = 10 + ) + var allLeaves []*universe.Item + for i := 0; i < numAssets; i++ { + leaf := genRandomAsset(t) + allLeaves = append(allLeaves, leaf) + } + + err := multiverse.UpsertProofLeafBatch(ctx, allLeaves) + require.NoError(t, err) + + // We query all roots and make sure they are all there. This will also + // cause the syncer cache to be filled. + originalRoots := queryRoots(t, multiverse, pageSize) + require.Len(t, originalRoots, numAssets) + assertAllLeavesInRoots(t, allLeaves, originalRoots) + + // Because we've enabled the cache from the beginning, the leaves + // inserted into the DB above should already be in the cache. That means + // we should have zero misses. + hitsPerFetch := numAssets / pageSize + require.EqualValues(t, 0, multiverse.syncerCache.miss.Load()) + require.EqualValues(t, hitsPerFetch, multiverse.syncerCache.hit.Load()) + + // We now randomly remove and re-insert some of the assets. The result + // should always be identical to the original one. + for i := 0; i < numAssets*100; i++ { + // Remove a random root from the cache. + root := originalRoots[test.RandIntn(len(originalRoots))] + cache := multiverse.syncerCache + cache.remove(root.ID.Key()) + + // The key should be removed from the cache. + require.Len(t, cache.universeKeyList, numAssets-1) + require.Len(t, cache.universeRoots, numAssets-1) + require.NotContains(t, cache.universeKeyList, root.ID.Key()) + require.NotContains(t, cache.universeRoots, root.ID.Key()) + + // Re-insert the root. + cache.addOrReplace(root) + + require.Len(t, cache.universeKeyList, numAssets) + require.Len(t, cache.universeRoots, numAssets) + require.Contains(t, cache.universeKeyList, root.ID.Key()) + require.Contains(t, cache.universeRoots, root.ID.Key()) + + roots := queryRoots(t, multiverse, pageSize) + require.Len(t, roots, numAssets) + require.Equal(t, originalRoots, roots) + + // No matter how we manipulate the entries, we should always hit + // the cache for syncer queries. + hits := hitsPerFetch * (i + 2) + require.EqualValues(t, 0, multiverse.syncerCache.miss.Load()) + require.EqualValues(t, hits, multiverse.syncerCache.hit.Load()) + } +} + +func genRandomAsset(t *testing.T) *universe.Item { + proofType := universe.ProofTypeIssuance + if test.RandBool() { + proofType = universe.ProofTypeTransfer + } + + assetGen := asset.RandGenesis(t, asset.Normal) + id := randUniverseID(t, test.RandBool(), withProofType(proofType)) + leaf := randMintingLeaf(t, assetGen, id.GroupKey) + id.AssetID = leaf.Asset.ID() + targetKey := randLeafKey(t) + + // For transfer proofs, we'll modify the witness asset proof to look + // more like a transfer. + if proofType == universe.ProofTypeTransfer { + prevWitnesses := leaf.Asset.PrevWitnesses + prevWitnesses[0].TxWitness = [][]byte{ + {1}, {1}, {1}, + } + prevID := prevWitnesses[0].PrevID + prevID.OutPoint.Hash = [32]byte{1} + } + + return &universe.Item{ + ID: id, + Key: targetKey, + Leaf: &leaf, + LogProofSync: false, + } +} + +// TestSyncerCacheMemoryUsage tests the memory usage of the syncer cache. +func TestSyncerCacheMemoryUsage(t *testing.T) { + for _, numRoots := range []uint64{500, 5_000, 50_000} { + allRoots := make([]universe.Root, numRoots) + start := time.Now() + for i := uint64(0); i < numRoots; i++ { + proofType := universe.ProofTypeIssuance + if test.RandBool() { + proofType = universe.ProofTypeTransfer + } + + assetGen := asset.RandGenesis(t, asset.Normal) + id := randUniverseID( + t, test.RandBool(), withProofType(proofType), + ) + allRoots[i] = universe.Root{ + ID: id, + AssetName: assetGen.Tag, + Node: mssmt.NewComputedBranch( + id.Bytes(), 1, + ), + } + } + t.Logf("Generated %d roots in %v", numRoots, time.Since(start)) + + t.Run(fmt.Sprintf("%d roots", numRoots), func(t *testing.T) { + res := testing.Benchmark(func(b *testing.B) { + b.ReportAllocs() + + cache := newSyncerRootNodeCache(true, numRoots) + cache.replaceCache(allRoots) + }) + + t.Logf("Memory usage for %d roots: %d bytes", + numRoots, res.MemBytes) + t.Logf("Memory usage per root: %d bytes", + res.MemBytes/numRoots) + t.Logf("Benchmark took %v", res.T) + }) + } +} + +func queryRoots(t *testing.T, multiverse *MultiverseStore, + pageSize int32) []universe.Root { + + var ( + offset int32 + roots []universe.Root + ) + for { + newRoots, err := multiverse.RootNodes( + context.Background(), universe.RootNodesQuery{ + WithAmountsById: false, + SortDirection: universe.SortAscending, + Offset: offset, + Limit: pageSize, + }, + ) + require.NoError(t, err) + + roots = append(roots, newRoots...) + offset += pageSize + + if len(newRoots) < int(pageSize) { + break + } + } + + return roots +} + +func assertAllLeavesInRoots(t *testing.T, allLeaves []*universe.Item, + roots []universe.Root) { + + for idx, leaf := range allLeaves { + haveRoot := false + for _, root := range roots { + if root.ID.Bytes() == leaf.ID.Bytes() { + haveRoot = true + break + } + } + + require.Truef(t, haveRoot, "no root found for leaf with ID %s "+ + "idx %d", leaf.ID.StringForLog(), idx) + } +} diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go index 8a86b552d..5f9c49996 100644 --- a/tapdb/sqlutils_test.go +++ b/tapdb/sqlutils_test.go @@ -258,7 +258,9 @@ func newDbHandleFromDb(db *BaseDB) *DbHandler { return db.WithTx(tx) }, ) - multiverseStore := NewMultiverseStore(multiverseTxCreator) + multiverseStore := NewMultiverseStore( + multiverseTxCreator, DefaultMultiverseStoreConfig(), + ) // Gain a handle to the pending (minting) assets store. assetMintingDB := NewTransactionExecutor( diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index 5529d6272..f7c5badfd 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -172,7 +172,7 @@ type BatchedUniverseServerStore interface { } // assetSyncCfgs is a map of asset ID to universe specific sync config. -type assetSyncCfgs = lnutils.SyncMap[treeID, *universe.FedUniSyncConfig] +type assetSyncCfgs = lnutils.SyncMap[universeIDKey, *universe.FedUniSyncConfig] // globalSyncCfgs is a map of proof type to global sync config. type globalSyncCfgs = lnutils.SyncMap[ @@ -699,7 +699,7 @@ func (u *UniverseFederationDB) QueryFederationSyncConfigs( return true }) - u.assetCfgs.Load().Range(func(treeID treeID, + u.assetCfgs.Load().Range(func(treeID universeIDKey, cfg *universe.FedUniSyncConfig) bool { uniConfigs = append(uniConfigs, cfg) @@ -820,7 +820,7 @@ func (u *UniverseFederationDB) QueryFederationSyncConfigs( } assetCfgs := u.assetCfgs.Load() for _, uniCfg := range uniConfigs { - assetCfgs.Store(treeID(uniCfg.UniverseID.String()), uniCfg) + assetCfgs.Store(uniCfg.UniverseID.String(), uniCfg) } return globalConfigs, uniConfigs, nil diff --git a/tapdb/universe_test.go b/tapdb/universe_test.go index beda00e2d..11f13a958 100644 --- a/tapdb/universe_test.go +++ b/tapdb/universe_test.go @@ -40,7 +40,7 @@ func withProofType(proofType universe.ProofType) universeIDOptFunc { } } -func randUniverseID(t *testing.T, forceGroup bool, +func randUniverseID(t testing.TB, forceGroup bool, optFunctions ...universeIDOptFunc) universe.Identifier { opts := defaultUniverseIdOptions() @@ -91,7 +91,7 @@ func newTestMultiverse(t *testing.T) (*MultiverseStore, sqlc.Querier) { }, ) - return NewMultiverseStore(dbTxer), db + return NewMultiverseStore(dbTxer, DefaultMultiverseStoreConfig()), db } func newTestMultiverseWithDb(db *BaseDB) (*MultiverseStore, sqlc.Querier) { @@ -101,7 +101,7 @@ func newTestMultiverseWithDb(db *BaseDB) (*MultiverseStore, sqlc.Querier) { }, ) - return NewMultiverseStore(dbTxer), db + return NewMultiverseStore(dbTxer, DefaultMultiverseStoreConfig()), db } func newTestUniverseWithDb(db *BaseDB, @@ -541,7 +541,9 @@ func TestUniverseTreeIsolation(t *testing.T) { return db.WithTx(tx) }, ) - multiverse := NewMultiverseStore(multiverseDB) + multiverse := NewMultiverseStore( + multiverseDB, DefaultMultiverseStoreConfig(), + ) rootNodes, err := multiverse.RootNodes( ctx, universe.RootNodesQuery{ diff --git a/tapgarden/custodian_test.go b/tapgarden/custodian_test.go index 7afce99e7..900400252 100644 --- a/tapgarden/custodian_test.go +++ b/tapgarden/custodian_test.go @@ -134,7 +134,9 @@ func newProofArchiveForDB(t *testing.T, db *tapdb.BaseDB) (*proof.MultiArchiver, return db.WithTx(tx) }, ) - multiverse := tapdb.NewMultiverseStore(multiverseDB) + multiverse := tapdb.NewMultiverseStore( + multiverseDB, tapdb.DefaultMultiverseStoreConfig(), + ) return proofArchive, assetStore, multiverse } diff --git a/universe/interface.go b/universe/interface.go index 3e5ed9008..8192078ec 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -38,9 +38,13 @@ var ( const ( // MaxPageSize is the maximum page size that can be used when querying // for asset roots and leaves. - MaxPageSize = 512 + MaxPageSize = 16384 ) +// IdentifierKey is the compact representation of a universe identifier that can +// be used as a map key. +type IdentifierKey [33]byte + // Identifier is the identifier for a universe. type Identifier struct { // AssetID is the asset ID for the universe. @@ -64,6 +68,18 @@ func (i *Identifier) Bytes() [32]byte { return i.AssetID } +// Key returns a bytes representation of the ID with the proof type appended to +// the end. This contains the same information as the String method, but in a +// way more compact form (42 bytes less), so it can be used as a map key. +func (i *Identifier) Key() IdentifierKey { + id := i.Bytes() + var b [33]byte + copy(b[:], id[:]) + b[32] = byte(i.ProofType) + + return b +} + // String returns a string representation of the ID. func (i *Identifier) String() string { // The namespace is prefixed by the proof type. This is done to make it diff --git a/universe/syncer.go b/universe/syncer.go index 64a69cf90..104b24890 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -525,7 +525,9 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, // fetchAllRoots fetches all the roots from the remote Universe. This function // is used in order to isolate any logic related to the specifics of how we // fetch the data from the universe server. -func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) ([]Root, error) { +func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, + diffEngine DiffEngine) ([]Root, error) { + offset := int32(0) pageSize := defaultPageSize roots := make([]Root, 0) @@ -533,6 +535,7 @@ func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) for { log.Debugf("Fetching roots in range: %v to %v", offset, offset+pageSize) + tempRoots, err := diffEngine.RootNodes( ctx, RootNodesQuery{ WithAmountsById: false, @@ -541,16 +544,17 @@ func (s *SimpleSyncer) fetchAllRoots(ctx context.Context, diffEngine DiffEngine) Limit: pageSize, }, ) - if err != nil { return nil, err } - if len(tempRoots) == 0 { + roots = append(roots, tempRoots...) + + // If we're getting a partial page, then we know we're done. + if len(tempRoots) < int(pageSize) { break } - roots = append(roots, tempRoots...) offset += pageSize }