From b0a66c37637c33ca7e81134f709f62966882a039 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 10:04:20 -0600 Subject: [PATCH 01/20] use encounter --- go.mod | 2 +- go.sum | 4 ++++ indexer/indexer.go | 36 +++++++++++++++++++++++++++++++++--- 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index d1ba28a6..715c75a7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.3 + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index a5f470a3..212de8e9 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,10 @@ github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coinbase/rosetta-sdk-go v0.6.3 h1:PPj14tPJ7SFc8sY/hlwK8zddT7PKwWU2wicxyerDxlg= github.com/coinbase/rosetta-sdk-go v0.6.3/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9 h1://Tm6m41Us3u+RjurU2JvU3pSj4z0gwEji67tsYOJFE= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e h1:+mNLz2cgw+kgMQ+RxIwkmOOX1u9sc/fVrrQBk07iYFs= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index cd3f2809..b0d926c5 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -329,6 +329,36 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { ) } + ops := 0 + for _, transaction := range block.Transactions { + ops += len(transaction.Operations) + } + + logger.Debugw( + "block added", + "hash", block.BlockIdentifier.Hash, + "index", block.BlockIdentifier.Index, + "transactions", len(block.Transactions), + "ops", ops, + ) + + return nil +} + +// BlockEncountered is called by the syncer when a block is encountered. +func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) error { + logger := utils.ExtractLogger(ctx, "indexer") + + err := i.blockStorage.EncounterBlock(ctx, block) + if err != nil { + return fmt.Errorf( + "%w: unable to encounter block to storage %s:%d", + err, + block.BlockIdentifier.Hash, + block.BlockIdentifier.Index, + ) + } + ops := 0 // Close channels of all blocks waiting. @@ -375,11 +405,9 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { i.waiter.Unlock() logger.Debugw( - "block added", + "block encountered", "hash", block.BlockIdentifier.Hash, "index", block.BlockIdentifier.Index, - "transactions", len(block.Transactions), - "ops", ops, ) return nil @@ -445,6 +473,8 @@ func (i *Indexer) findCoin( ) } + // TODO: check encounter bank of coins to enable full pre-syncing + // Attempt to find coin coin, owner, err := i.coinStorage.GetCoinTransactional( ctx, From 2e0fcf50ebbec05bc7f696f4e510cc5236b2ac61 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 10:16:57 -0600 Subject: [PATCH 02/20] allow for populating coins from encounter cache --- indexer/indexer.go | 60 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index b0d926c5..5536446c 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/coinbase/rosetta-bitcoin/bitcoin" @@ -97,6 +98,13 @@ type Indexer struct { workers []modules.BlockWorker waiter *waitTable + + // store coins created in encountered before added + coinCache map[string]*types.AccountCoin + coinCacheMutex sync.Mutex + + encountered int64 // if increases, then retry coin lookup + encounteredMutex sync.Mutex } // CloseDatabase closes a storage.Database. This should be called @@ -200,6 +208,7 @@ func Initialize( blockStorage: blockStorage, waiter: newWaitTable(), asserter: asserter, + coinCache: map[string]*types.AccountCoin{}, } coinStorage := modules.NewCoinStorage( @@ -334,6 +343,19 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { ops += len(transaction.Operations) } + // clean cache intermediate + i.coinCacheMutex.Lock() + for _, tx := range block.Transactions { + for _, op := range tx.Operations { + if op.CoinChange == nil { + continue + } + + delete(i.coinCache, op.CoinChange.CoinIdentifier.Identifier) + } + } + i.coinCacheMutex.Unlock() + logger.Debugw( "block added", "hash", block.BlockIdentifier.Hash, @@ -349,6 +371,30 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) error { logger := utils.ExtractLogger(ctx, "indexer") + // load intermediate + i.coinCacheMutex.Lock() + for _, tx := range block.Transactions { + for _, op := range tx.Operations { + if op.CoinChange == nil { + continue + } + + i.coinCache[op.CoinChange.CoinIdentifier.Identifier] = &types.AccountCoin{ + Account: op.Account, + Coin: &types.Coin{ + CoinIdentifier: op.CoinChange.CoinIdentifier, + Amount: op.Amount, + }, + } + } + } + i.coinCacheMutex.Unlock() + + // Update so that lookers know it exists + i.encounteredMutex.Lock() + i.encountered++ + i.encounteredMutex.Unlock() + err := i.blockStorage.EncounterBlock(ctx, block) if err != nil { return fmt.Errorf( @@ -452,6 +498,7 @@ func (i *Indexer) findCoin( coinIdentifier string, ) (*types.Coin, *types.AccountIdentifier, error) { for ctx.Err() == nil { + startEncountered := i.encountered databaseTransaction := i.database.ReadTransaction(ctx) defer databaseTransaction.Discard(ctx) @@ -473,8 +520,6 @@ func (i *Indexer) findCoin( ) } - // TODO: check encounter bank of coins to enable full pre-syncing - // Attempt to find coin coin, owner, err := i.coinStorage.GetCoinTransactional( ctx, @@ -491,6 +536,15 @@ func (i *Indexer) findCoin( return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier) } + // Check encounter table + // TODO: check encounter bank of coins to enable full pre-syncing + // otherwise will still be stuck + i.coinCacheMutex.Lock() + if accCoin, ok := i.coinCache[coinIdentifier]; ok { + return accCoin.Coin, accCoin.Account, nil + } + i.coinCacheMutex.Unlock() + // Locking here prevents us from adding sending any done // signals while we are determining whether or not to add // to the WaitTable. @@ -505,7 +559,7 @@ func (i *Indexer) findCoin( // If the block has changed, we try to look up the transaction // again. - if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) { + if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.encountered != startEncountered { i.waiter.Unlock() continue } From df8aeef9d4200321151cea6f41fc1fc5c54783bf Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 10:37:39 -0600 Subject: [PATCH 03/20] only store coin created --- indexer/indexer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/indexer/indexer.go b/indexer/indexer.go index 5536446c..4fdadc8a 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -351,6 +351,10 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { continue } + if op.CoinChange.CoinAction != types.CoinCreated { + continue + } + delete(i.coinCache, op.CoinChange.CoinIdentifier.Identifier) } } @@ -379,6 +383,11 @@ func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) erro continue } + // We only care about newly accessible coins. + if op.CoinChange.CoinAction != types.CoinCreated { + continue + } + i.coinCache[op.CoinChange.CoinIdentifier.Identifier] = &types.AccountCoin{ Account: op.Account, Coin: &types.Coin{ From 3895f223ee449968440d7af563931e6fe2b37c41 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 10:53:01 -0600 Subject: [PATCH 04/20] fix locking issue --- indexer/indexer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 4fdadc8a..6158fd6b 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -549,10 +549,11 @@ func (i *Indexer) findCoin( // TODO: check encounter bank of coins to enable full pre-syncing // otherwise will still be stuck i.coinCacheMutex.Lock() - if accCoin, ok := i.coinCache[coinIdentifier]; ok { + accCoin, ok := i.coinCache[coinIdentifier] + i.coinCacheMutex.Unlock() + if ok { return accCoin.Coin, accCoin.Account, nil } - i.coinCacheMutex.Unlock() // Locking here prevents us from adding sending any done // signals while we are determining whether or not to add From 299a1942c02b99b282b00e86c17622b3b32417f6 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 11:00:45 -0600 Subject: [PATCH 05/20] debug nil result --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 715c75a7..6904e6ea 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 212de8e9..a53a2fd2 100644 --- a/go.sum +++ b/go.sum @@ -72,6 +72,8 @@ github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9 h1://Tm6 github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e h1:+mNLz2cgw+kgMQ+RxIwkmOOX1u9sc/fVrrQBk07iYFs= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 h1:rHlMsOe1YOoHMS06+aI7q//2WTbxrf2rXpACw/aRR60= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= From d94e76d40de0149b7ceab5f5ec78bf305d8b17d7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:14:26 -0600 Subject: [PATCH 06/20] Use priority mutex --- indexer/indexer.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 6158fd6b..881792c7 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -101,7 +101,7 @@ type Indexer struct { // store coins created in encountered before added coinCache map[string]*types.AccountCoin - coinCacheMutex sync.Mutex + coinCacheMutex *sdkUtils.PriorityMutex encountered int64 // if increases, then retry coin lookup encounteredMutex sync.Mutex @@ -200,15 +200,16 @@ func Initialize( } i := &Indexer{ - cancel: cancel, - network: config.Network, - pruningConfig: config.Pruning, - client: client, - database: localStore, - blockStorage: blockStorage, - waiter: newWaitTable(), - asserter: asserter, - coinCache: map[string]*types.AccountCoin{}, + cancel: cancel, + network: config.Network, + pruningConfig: config.Pruning, + client: client, + database: localStore, + blockStorage: blockStorage, + waiter: newWaitTable(), + asserter: asserter, + coinCache: map[string]*types.AccountCoin{}, + coinCacheMutex: new(sdkUtils.PriorityMutex), } coinStorage := modules.NewCoinStorage( @@ -344,7 +345,7 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } // clean cache intermediate - i.coinCacheMutex.Lock() + i.coinCacheMutex.Lock(true) for _, tx := range block.Transactions { for _, op := range tx.Operations { if op.CoinChange == nil { @@ -376,7 +377,7 @@ func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) erro logger := utils.ExtractLogger(ctx, "indexer") // load intermediate - i.coinCacheMutex.Lock() + i.coinCacheMutex.Lock(false) for _, tx := range block.Transactions { for _, op := range tx.Operations { if op.CoinChange == nil { @@ -548,7 +549,7 @@ func (i *Indexer) findCoin( // Check encounter table // TODO: check encounter bank of coins to enable full pre-syncing // otherwise will still be stuck - i.coinCacheMutex.Lock() + i.coinCacheMutex.Lock(false) accCoin, ok := i.coinCache[coinIdentifier] i.coinCacheMutex.Unlock() if ok { From 64722f6c61ccfa52994c2461b13e8b634087e7a9 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:20:15 -0600 Subject: [PATCH 07/20] don't proactively orphan head --- indexer/indexer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 881792c7..5c3d3f46 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -623,9 +623,10 @@ func (i *Indexer) findCoins( btcBlock *bitcoin.Block, coins []string, ) (map[string]*types.AccountCoin, error) { - if err := i.checkHeaderMatch(ctx, btcBlock); err != nil { - return nil, fmt.Errorf("%w: check header match failed", err) - } + // TODO: we need to abort correctly when this occurs still + // if err := i.checkHeaderMatch(ctx, btcBlock); err != nil { + // return nil, fmt.Errorf("%w: check header match failed", err) + // } coinMap := map[string]*types.AccountCoin{} remainingCoins := []string{} From 3c09f20cf3b352ffbe079627b65974ae1cf4a25e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:27:24 -0600 Subject: [PATCH 08/20] skip if end block --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6904e6ea..2a1cc87a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index a53a2fd2..06e524de 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,8 @@ github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e h1:+mNLz github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 h1:rHlMsOe1YOoHMS06+aI7q//2WTbxrf2rXpACw/aRR60= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 h1:Y7JTSEw9lZezKc4NCEGqA0iN8Tr7wWR+RZP5gOvS/WY= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= From 98e7d672bbe04c9b278cd05f6dc6512f0f89ff13 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sat, 5 Dec 2020 14:38:17 -0600 Subject: [PATCH 09/20] fix accidental abort --- indexer/indexer.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 5c3d3f46..cb47f411 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -445,19 +445,21 @@ func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) erro // Look for all remaining waiting transactions associated // with the next block that have not yet been closed. We should // abort these waits as they will never be closed by a new transaction. - for txHash, val := range i.waiter.table { - if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed { - logger.Debugw( - "aborting channel", - "hash", block.BlockIdentifier.Hash, - "index", block.BlockIdentifier.Index, - "channel", txHash, - ) - val.channelClosed = true - val.aborted = true - close(val.channel) - } - } + + // TODO: need to fix this + // for txHash, val := range i.waiter.table { + // if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed { + // logger.Debugw( + // "aborting channel", + // "hash", block.BlockIdentifier.Hash, + // "index", block.BlockIdentifier.Index, + // "channel", txHash, + // ) + // val.channelClosed = true + // val.aborted = true + // close(val.channel) + // } + // } i.waiter.Unlock() logger.Debugw( From 7a26a2f9c2c8ab2efcdf34f3260f07b28197e5a2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 6 Dec 2020 10:14:49 -0600 Subject: [PATCH 10/20] debug deadlock --- go.mod | 2 +- go.sum | 2 ++ indexer/indexer.go | 52 +++++++++++++++++++++++----------------------- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 2a1cc87a..b96c71f1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 06e524de..3530ff43 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,8 @@ github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 h1:rHlMs github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 h1:Y7JTSEw9lZezKc4NCEGqA0iN8Tr7wWR+RZP5gOvS/WY= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7 h1:53EVYPuXEXL/3/OuGtWy0gtz/2HSiMhKMSWrgEEoudY= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index cb47f411..b437657f 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -361,6 +361,25 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.coinCacheMutex.Unlock() + // Look for all remaining waiting transactions associated + // with the next block that have not yet been closed. We should + // abort these waits as they will never be closed by a new transaction. + i.waiter.Lock() + for txHash, val := range i.waiter.table { + if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed { + logger.Debugw( + "aborting channel", + "hash", block.BlockIdentifier.Hash, + "index", block.BlockIdentifier.Index, + "channel", txHash, + ) + val.channelClosed = true + val.aborted = true + close(val.channel) + } + } + i.waiter.Unlock() + logger.Debugw( "block added", "hash", block.BlockIdentifier.Hash, @@ -372,8 +391,8 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { return nil } -// BlockEncountered is called by the syncer when a block is encountered. -func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) error { +// BlockSeen is called by the syncer when a block is encountered. +func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { logger := utils.ExtractLogger(ctx, "indexer") // load intermediate @@ -405,7 +424,7 @@ func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) erro i.encountered++ i.encounteredMutex.Unlock() - err := i.blockStorage.EncounterBlock(ctx, block) + err := i.blockStorage.SeeBlock(ctx, block) if err != nil { return fmt.Errorf( "%w: unable to encounter block to storage %s:%d", @@ -441,25 +460,6 @@ func (i *Indexer) BlockEncountered(ctx context.Context, block *types.Block) erro val.channelClosed = true close(val.channel) } - - // Look for all remaining waiting transactions associated - // with the next block that have not yet been closed. We should - // abort these waits as they will never be closed by a new transaction. - - // TODO: need to fix this - // for txHash, val := range i.waiter.table { - // if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed { - // logger.Debugw( - // "aborting channel", - // "hash", block.BlockIdentifier.Hash, - // "index", block.BlockIdentifier.Index, - // "channel", txHash, - // ) - // val.channelClosed = true - // val.aborted = true - // close(val.channel) - // } - // } i.waiter.Unlock() logger.Debugw( @@ -567,6 +567,7 @@ func (i *Indexer) findCoin( // we created our databaseTransaction. currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx) if err != nil { + i.waiter.Unlock() return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err) } @@ -625,10 +626,9 @@ func (i *Indexer) findCoins( btcBlock *bitcoin.Block, coins []string, ) (map[string]*types.AccountCoin, error) { - // TODO: we need to abort correctly when this occurs still - // if err := i.checkHeaderMatch(ctx, btcBlock); err != nil { - // return nil, fmt.Errorf("%w: check header match failed", err) - // } + if err := i.checkHeaderMatch(ctx, btcBlock); err != nil { + return nil, fmt.Errorf("%w: check header match failed", err) + } coinMap := map[string]*types.AccountCoin{} remainingCoins := []string{} From 60c852e1d639972949a9fec8a526bf2e9fbf6f2d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Sun, 6 Dec 2020 10:18:42 -0600 Subject: [PATCH 11/20] fix logging message --- indexer/indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index b437657f..dd6de23e 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -463,7 +463,7 @@ func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { i.waiter.Unlock() logger.Debugw( - "block encountered", + "block seen", "hash", block.BlockIdentifier.Hash, "index", block.BlockIdentifier.Index, ) From 4ab141215fea76ce862c34febc3e9a7c716bce4e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 08:33:57 -0600 Subject: [PATCH 12/20] debug add block speed --- go.mod | 2 +- go.sum | 4 ++++ indexer/indexer.go | 12 ++++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b96c71f1..15b07d53 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7 + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 3530ff43..37692f41 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,10 @@ github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 h1:Y7JTS github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7 h1:53EVYPuXEXL/3/OuGtWy0gtz/2HSiMhKMSWrgEEoudY= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208 h1:arGGiIscN+E1kWi+nX31DftCfktR3d3bOC8ONRuPyko= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44 h1:jyV+DlKYbB2Vsz0PfdwoasnAITasdwcirUezOkSkrtM= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index dd6de23e..b328d84a 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -329,6 +329,8 @@ func (i *Indexer) Prune(ctx context.Context) error { func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { logger := utils.ExtractLogger(ctx, "indexer") + start := time.Now() + err := i.blockStorage.AddBlock(ctx, block) if err != nil { return fmt.Errorf( @@ -339,6 +341,8 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { ) } + addBlockTime := time.Since(start) + ops := 0 for _, transaction := range block.Transactions { ops += len(transaction.Operations) @@ -361,6 +365,8 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.coinCacheMutex.Unlock() + cleanCacheTime := time.Since(start) - addBlockTime + // Look for all remaining waiting transactions associated // with the next block that have not yet been closed. We should // abort these waits as they will never be closed by a new transaction. @@ -380,12 +386,18 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.waiter.Unlock() + waiterIteration := time.Since(start) - addBlockTime - cleanCacheTime + logger.Debugw( "block added", "hash", block.BlockIdentifier.Hash, "index", block.BlockIdentifier.Index, "transactions", len(block.Transactions), "ops", ops, + "total add time", time.Since(start), + "add time", addBlockTime, + "clean cache time", cleanCacheTime, + "waiter iteration", waiterIteration, ) return nil From f5a27e38d09f71b4e3853df30c8f4d530a64df83 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 16:39:30 -0600 Subject: [PATCH 13/20] pre-release test --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 15b07d53..c4b9ef39 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44 + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 37692f41..3088fbe8 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208 h1:arGGi github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44 h1:jyV+DlKYbB2Vsz0PfdwoasnAITasdwcirUezOkSkrtM= github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f h1:yD+9ZauO261vN9eR4R6KulToMztNY9KWROUW+8QGROc= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= From e194ff6819e5b25f676bae7f768c1f87aedf93af Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 7 Dec 2020 17:13:15 -0600 Subject: [PATCH 14/20] limit compression concurrency --- go.mod | 2 +- go.sum | 20 ++------------------ indexer/indexer.go | 8 ++++++++ 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index c4b9ef39..423edf7a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 3088fbe8..ca52ee53 100644 --- a/go.sum +++ b/go.sum @@ -66,24 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coinbase/rosetta-sdk-go v0.6.3 h1:PPj14tPJ7SFc8sY/hlwK8zddT7PKwWU2wicxyerDxlg= -github.com/coinbase/rosetta-sdk-go v0.6.3/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9 h1://Tm6m41Us3u+RjurU2JvU3pSj4z0gwEji67tsYOJFE= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205155400-04a04558f4a9/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e h1:+mNLz2cgw+kgMQ+RxIwkmOOX1u9sc/fVrrQBk07iYFs= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205160037-3f765639138e/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4 h1:rHlMsOe1YOoHMS06+aI7q//2WTbxrf2rXpACw/aRR60= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205170004-5e50069967e4/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386 h1:Y7JTSEw9lZezKc4NCEGqA0iN8Tr7wWR+RZP5gOvS/WY= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201205202626-20e9956fc386/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7 h1:53EVYPuXEXL/3/OuGtWy0gtz/2HSiMhKMSWrgEEoudY= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201206160637-6155702fc1d7/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208 h1:arGGiIscN+E1kWi+nX31DftCfktR3d3bOC8ONRuPyko= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207145242-fffc69dd8208/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44 h1:jyV+DlKYbB2Vsz0PfdwoasnAITasdwcirUezOkSkrtM= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207152233-89b1075cef44/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f h1:yD+9ZauO261vN9eR4R6KulToMztNY9KWROUW+8QGROc= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207223247-6a3d17c4fe1f/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef h1:9GZCUSMyTQATwWp2LTcHoIumPWFj65NOzR1gEWPBTak= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index b328d84a..2750b8a5 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "runtime" "sync" "time" @@ -59,6 +60,12 @@ const ( // zeroValue is 0 as a string zeroValue = "0" + + // overclockMultiplier is the amount + // we multiply runtime.NumCPU by to determine + // how many concurrent compressions we should + // perform when pre-storing block data. + overclockMultiplier = 4 ) var ( @@ -277,6 +284,7 @@ func (i *Indexer) Sync(ctx context.Context) error { syncer.WithCacheSize(syncer.DefaultCacheSize), syncer.WithSizeMultiplier(sizeMultiplier), syncer.WithPastBlocks(pastBlocks), + syncer.WithSeenConcurrency(int64(runtime.NumCPU()*overclockMultiplier)), ) return syncer.Sync(ctx, startIndex, indexPlaceholder) From 1038aeff7f830f05aafbb3ea32f54477dd19fbd7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 19:00:30 -0600 Subject: [PATCH 15/20] Use errgroup pool --- go.mod | 2 +- go.sum | 4 ++-- indexer/indexer.go | 15 +++++++++++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 423edf7a..12c376ee 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef + github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index ca52ee53..81a7d7db 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef h1:9GZCUSMyTQATwWp2LTcHoIumPWFj65NOzR1gEWPBTak= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201207231016-6dab4bf07aef/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd h1:X+esmCEka7IJWfEII4xKvXHmaRF2a7YMG0zngJMsgvg= +github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index 2750b8a5..80677b57 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -36,6 +36,7 @@ import ( sdkUtils "github.com/coinbase/rosetta-sdk-go/utils" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" + "golang.org/x/sync/semaphore" ) const ( @@ -66,6 +67,9 @@ const ( // how many concurrent compressions we should // perform when pre-storing block data. overclockMultiplier = 4 + + // semaphoreWeight is the weight of each semaphore request. + semaphoreWeight = int64(1) ) var ( @@ -112,6 +116,8 @@ type Indexer struct { encountered int64 // if increases, then retry coin lookup encounteredMutex sync.Mutex + + seenSemaphore *semaphore.Weighted } // CloseDatabase closes a storage.Database. This should be called @@ -193,7 +199,7 @@ func Initialize( return nil, fmt.Errorf("%w: unable to initialize storage", err) } - blockStorage := modules.NewBlockStorage(localStore) + blockStorage := modules.NewBlockStorage(localStore, runtime.NumCPU()*overclockMultiplier) asserter, err := asserter.NewClientWithOptions( config.Network, config.GenesisBlockIdentifier, @@ -217,6 +223,7 @@ func Initialize( asserter: asserter, coinCache: map[string]*types.AccountCoin{}, coinCacheMutex: new(sdkUtils.PriorityMutex), + seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())), } coinStorage := modules.NewCoinStorage( @@ -284,7 +291,6 @@ func (i *Indexer) Sync(ctx context.Context) error { syncer.WithCacheSize(syncer.DefaultCacheSize), syncer.WithSizeMultiplier(sizeMultiplier), syncer.WithPastBlocks(pastBlocks), - syncer.WithSeenConcurrency(int64(runtime.NumCPU()*overclockMultiplier)), ) return syncer.Sync(ctx, startIndex, indexPlaceholder) @@ -413,6 +419,11 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { // BlockSeen is called by the syncer when a block is encountered. func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { + if err := i.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil { + return err + } + defer i.seenSemaphore.Release(semaphoreWeight) + logger := utils.ExtractLogger(ctx, "indexer") // load intermediate From c72482f21fd13c6e507ba30f88aa0c2c105446cc Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 19:02:53 -0600 Subject: [PATCH 16/20] modify table and value log sizes --- indexer/indexer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 80677b57..03ac454a 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -148,11 +148,11 @@ func defaultBadgerOptions( opts.ValueLogLoadingMode = options.MemoryMap // Use an extended table size for larger commits. - opts.MaxTableSize = database.DefaultMaxTableSize + opts.MaxTableSize = 512 << 20 // Smaller value log sizes means smaller contiguous memory allocations // and less RAM usage on cleanup. - opts.ValueLogFileSize = database.DefaultLogValueSize + opts.ValueLogFileSize = 1024 << 20 // To allow writes at a faster speed, we create a new memtable as soon as // an existing memtable is filled up. This option determines how many From f02e95a31c45b250ce2500bfdfde9a55fdb0e232 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 19:08:43 -0600 Subject: [PATCH 17/20] increase overclock multiplier --- indexer/indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 03ac454a..dee142b9 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -66,7 +66,7 @@ const ( // we multiply runtime.NumCPU by to determine // how many concurrent compressions we should // perform when pre-storing block data. - overclockMultiplier = 4 + overclockMultiplier = 16 // semaphoreWeight is the weight of each semaphore request. semaphoreWeight = int64(1) From d415efba9b61741d0445af0bd17f5178b8a8f53f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 9 Dec 2020 12:56:46 -0600 Subject: [PATCH 18/20] update to official release --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 12c376ee..3d10bf54 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd + github.com/coinbase/rosetta-sdk-go v0.6.5 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 81a7d7db..75c15686 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd h1:X+esmCEka7IJWfEII4xKvXHmaRF2a7YMG0zngJMsgvg= -github.com/coinbase/rosetta-sdk-go v0.6.5-0.20201209004309-65038e6eaacd/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5 h1:RytFDCPXS64vEYwIOsxsoQGlZZyP9RQvzyYikxymI4w= +github.com/coinbase/rosetta-sdk-go v0.6.5/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= From c3182633494a39d691e5abd937bc12cf33524cc8 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 9 Dec 2020 13:07:09 -0600 Subject: [PATCH 19/20] indexer cleanup --- indexer/indexer.go | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index dee142b9..9f8bb199 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -64,8 +64,8 @@ const ( // overclockMultiplier is the amount // we multiply runtime.NumCPU by to determine - // how many concurrent compressions we should - // perform when pre-storing block data. + // how many goroutines we should + // spwan to handle block data sequencing. overclockMultiplier = 16 // semaphoreWeight is the weight of each semaphore request. @@ -110,12 +110,17 @@ type Indexer struct { waiter *waitTable - // store coins created in encountered before added + // Store coins created in pre-store before persisted + // in add block so we can optimistically populate + // blocks before committed. coinCache map[string]*types.AccountCoin coinCacheMutex *sdkUtils.PriorityMutex - encountered int64 // if increases, then retry coin lookup - encounteredMutex sync.Mutex + // When populating blocks using pre-stored blocks, + // we should retry if a new block was seen (similar + // to trying again if head block changes). + seen int64 + seenMutex sync.Mutex seenSemaphore *semaphore.Weighted } @@ -148,11 +153,11 @@ func defaultBadgerOptions( opts.ValueLogLoadingMode = options.MemoryMap // Use an extended table size for larger commits. - opts.MaxTableSize = 512 << 20 + opts.MaxTableSize = database.DefaultMaxTableSize // Smaller value log sizes means smaller contiguous memory allocations // and less RAM usage on cleanup. - opts.ValueLogFileSize = 1024 << 20 + opts.ValueLogFileSize = database.DefaultLogValueSize // To allow writes at a faster speed, we create a new memtable as soon as // an existing memtable is filled up. This option determines how many @@ -343,8 +348,6 @@ func (i *Indexer) Prune(ctx context.Context) error { func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { logger := utils.ExtractLogger(ctx, "indexer") - start := time.Now() - err := i.blockStorage.AddBlock(ctx, block) if err != nil { return fmt.Errorf( @@ -355,8 +358,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { ) } - addBlockTime := time.Since(start) - ops := 0 for _, transaction := range block.Transactions { ops += len(transaction.Operations) @@ -379,8 +380,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.coinCacheMutex.Unlock() - cleanCacheTime := time.Since(start) - addBlockTime - // Look for all remaining waiting transactions associated // with the next block that have not yet been closed. We should // abort these waits as they will never be closed by a new transaction. @@ -400,18 +399,12 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.waiter.Unlock() - waiterIteration := time.Since(start) - addBlockTime - cleanCacheTime - logger.Debugw( "block added", "hash", block.BlockIdentifier.Hash, "index", block.BlockIdentifier.Index, "transactions", len(block.Transactions), "ops", ops, - "total add time", time.Since(start), - "add time", addBlockTime, - "clean cache time", cleanCacheTime, - "waiter iteration", waiterIteration, ) return nil @@ -451,9 +444,9 @@ func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { i.coinCacheMutex.Unlock() // Update so that lookers know it exists - i.encounteredMutex.Lock() - i.encountered++ - i.encounteredMutex.Unlock() + i.seenMutex.Lock() + i.seen++ + i.seenMutex.Unlock() err := i.blockStorage.SeeBlock(ctx, block) if err != nil { @@ -541,7 +534,7 @@ func (i *Indexer) findCoin( coinIdentifier string, ) (*types.Coin, *types.AccountIdentifier, error) { for ctx.Err() == nil { - startEncountered := i.encountered + startSeen := i.seen databaseTransaction := i.database.ReadTransaction(ctx) defer databaseTransaction.Discard(ctx) @@ -579,9 +572,7 @@ func (i *Indexer) findCoin( return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier) } - // Check encounter table - // TODO: check encounter bank of coins to enable full pre-syncing - // otherwise will still be stuck + // Check seen CoinCache i.coinCacheMutex.Lock(false) accCoin, ok := i.coinCache[coinIdentifier] i.coinCacheMutex.Unlock() @@ -604,7 +595,7 @@ func (i *Indexer) findCoin( // If the block has changed, we try to look up the transaction // again. - if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.encountered != startEncountered { + if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen { i.waiter.Unlock() continue } From 8eaa063fdda66eea706b90a878ed334cb959d9f9 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 9 Dec 2020 13:13:21 -0600 Subject: [PATCH 20/20] update version --- services/network_service_test.go | 2 +- services/types.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/network_service_test.go b/services/network_service_test.go index ca3ce36b..74d238ca 100644 --- a/services/network_service_test.go +++ b/services/network_service_test.go @@ -27,7 +27,7 @@ import ( ) var ( - middlewareVersion = "0.0.8" + middlewareVersion = "0.0.9" defaultNetworkOptions = &types.NetworkOptionsResponse{ Version: &types.Version{ RosettaVersion: types.RosettaAPIVersion, diff --git a/services/types.go b/services/types.go index 2948b806..1937885d 100644 --- a/services/types.go +++ b/services/types.go @@ -45,7 +45,7 @@ const ( // variable instead of a constant because // we typically need the pointer of this // value. - MiddlewareVersion = "0.0.8" + MiddlewareVersion = "0.0.9" ) // Client is used by the servicers to get Peer information