diff --git a/go.mod b/go.mod index d1ba28a6..3d10bf54 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 - github.com/coinbase/rosetta-sdk-go v0.6.3 + github.com/coinbase/rosetta-sdk-go v0.6.5 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index a5f470a3..75c15686 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coinbase/rosetta-sdk-go v0.6.3 h1:PPj14tPJ7SFc8sY/hlwK8zddT7PKwWU2wicxyerDxlg= -github.com/coinbase/rosetta-sdk-go v0.6.3/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= +github.com/coinbase/rosetta-sdk-go v0.6.5 h1:RytFDCPXS64vEYwIOsxsoQGlZZyP9RQvzyYikxymI4w= +github.com/coinbase/rosetta-sdk-go v0.6.5/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/indexer/indexer.go b/indexer/indexer.go index cd3f2809..9f8bb199 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -18,6 +18,8 @@ import ( "context" "errors" "fmt" + "runtime" + "sync" "time" "github.com/coinbase/rosetta-bitcoin/bitcoin" @@ -34,6 +36,7 @@ import ( sdkUtils "github.com/coinbase/rosetta-sdk-go/utils" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" + "golang.org/x/sync/semaphore" ) const ( @@ -58,6 +61,15 @@ const ( // zeroValue is 0 as a string zeroValue = "0" + + // overclockMultiplier is the amount + // we multiply runtime.NumCPU by to determine + // how many goroutines we should + // spwan to handle block data sequencing. + overclockMultiplier = 16 + + // semaphoreWeight is the weight of each semaphore request. + semaphoreWeight = int64(1) ) var ( @@ -97,6 +109,20 @@ type Indexer struct { workers []modules.BlockWorker waiter *waitTable + + // Store coins created in pre-store before persisted + // in add block so we can optimistically populate + // blocks before committed. + coinCache map[string]*types.AccountCoin + coinCacheMutex *sdkUtils.PriorityMutex + + // When populating blocks using pre-stored blocks, + // we should retry if a new block was seen (similar + // to trying again if head block changes). + seen int64 + seenMutex sync.Mutex + + seenSemaphore *semaphore.Weighted } // CloseDatabase closes a storage.Database. This should be called @@ -178,7 +204,7 @@ func Initialize( return nil, fmt.Errorf("%w: unable to initialize storage", err) } - blockStorage := modules.NewBlockStorage(localStore) + blockStorage := modules.NewBlockStorage(localStore, runtime.NumCPU()*overclockMultiplier) asserter, err := asserter.NewClientWithOptions( config.Network, config.GenesisBlockIdentifier, @@ -192,14 +218,17 @@ func Initialize( } i := &Indexer{ - cancel: cancel, - network: config.Network, - pruningConfig: config.Pruning, - client: client, - database: localStore, - blockStorage: blockStorage, - waiter: newWaitTable(), - asserter: asserter, + cancel: cancel, + network: config.Network, + pruningConfig: config.Pruning, + client: client, + database: localStore, + blockStorage: blockStorage, + waiter: newWaitTable(), + asserter: asserter, + coinCache: map[string]*types.AccountCoin{}, + coinCacheMutex: new(sdkUtils.PriorityMutex), + seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())), } coinStorage := modules.NewCoinStorage( @@ -330,35 +359,31 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } ops := 0 - - // Close channels of all blocks waiting. - i.waiter.Lock() for _, transaction := range block.Transactions { ops += len(transaction.Operations) - txHash := transaction.TransactionIdentifier.Hash - val, ok := i.waiter.Get(txHash, false) - if !ok { - continue - } + } - if val.channelClosed { - logger.Debugw( - "channel already closed", - "hash", block.BlockIdentifier.Hash, - "index", block.BlockIdentifier.Index, - "channel", txHash, - ) - continue - } + // clean cache intermediate + i.coinCacheMutex.Lock(true) + for _, tx := range block.Transactions { + for _, op := range tx.Operations { + if op.CoinChange == nil { + continue + } - // Closing channel will cause all listeners to continue - val.channelClosed = true - close(val.channel) + if op.CoinChange.CoinAction != types.CoinCreated { + continue + } + + delete(i.coinCache, op.CoinChange.CoinIdentifier.Identifier) + } } + i.coinCacheMutex.Unlock() // Look for all remaining waiting transactions associated // with the next block that have not yet been closed. We should // abort these waits as they will never be closed by a new transaction. + i.waiter.Lock() for txHash, val := range i.waiter.table { if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed { logger.Debugw( @@ -385,6 +410,91 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { return nil } +// BlockSeen is called by the syncer when a block is encountered. +func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { + if err := i.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil { + return err + } + defer i.seenSemaphore.Release(semaphoreWeight) + + logger := utils.ExtractLogger(ctx, "indexer") + + // load intermediate + i.coinCacheMutex.Lock(false) + for _, tx := range block.Transactions { + for _, op := range tx.Operations { + if op.CoinChange == nil { + continue + } + + // We only care about newly accessible coins. + if op.CoinChange.CoinAction != types.CoinCreated { + continue + } + + i.coinCache[op.CoinChange.CoinIdentifier.Identifier] = &types.AccountCoin{ + Account: op.Account, + Coin: &types.Coin{ + CoinIdentifier: op.CoinChange.CoinIdentifier, + Amount: op.Amount, + }, + } + } + } + i.coinCacheMutex.Unlock() + + // Update so that lookers know it exists + i.seenMutex.Lock() + i.seen++ + i.seenMutex.Unlock() + + err := i.blockStorage.SeeBlock(ctx, block) + if err != nil { + return fmt.Errorf( + "%w: unable to encounter block to storage %s:%d", + err, + block.BlockIdentifier.Hash, + block.BlockIdentifier.Index, + ) + } + + ops := 0 + + // Close channels of all blocks waiting. + i.waiter.Lock() + for _, transaction := range block.Transactions { + ops += len(transaction.Operations) + txHash := transaction.TransactionIdentifier.Hash + val, ok := i.waiter.Get(txHash, false) + if !ok { + continue + } + + if val.channelClosed { + logger.Debugw( + "channel already closed", + "hash", block.BlockIdentifier.Hash, + "index", block.BlockIdentifier.Index, + "channel", txHash, + ) + continue + } + + // Closing channel will cause all listeners to continue + val.channelClosed = true + close(val.channel) + } + i.waiter.Unlock() + + logger.Debugw( + "block seen", + "hash", block.BlockIdentifier.Hash, + "index", block.BlockIdentifier.Index, + ) + + return nil +} + // BlockRemoved is called by the syncer when a block is removed. func (i *Indexer) BlockRemoved( ctx context.Context, @@ -424,6 +534,7 @@ func (i *Indexer) findCoin( coinIdentifier string, ) (*types.Coin, *types.AccountIdentifier, error) { for ctx.Err() == nil { + startSeen := i.seen databaseTransaction := i.database.ReadTransaction(ctx) defer databaseTransaction.Discard(ctx) @@ -461,6 +572,14 @@ func (i *Indexer) findCoin( return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier) } + // Check seen CoinCache + i.coinCacheMutex.Lock(false) + accCoin, ok := i.coinCache[coinIdentifier] + i.coinCacheMutex.Unlock() + if ok { + return accCoin.Coin, accCoin.Account, nil + } + // Locking here prevents us from adding sending any done // signals while we are determining whether or not to add // to the WaitTable. @@ -470,12 +589,13 @@ func (i *Indexer) findCoin( // we created our databaseTransaction. currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx) if err != nil { + i.waiter.Unlock() return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err) } // If the block has changed, we try to look up the transaction // again. - if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) { + if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen { i.waiter.Unlock() continue } diff --git a/services/network_service_test.go b/services/network_service_test.go index ca3ce36b..74d238ca 100644 --- a/services/network_service_test.go +++ b/services/network_service_test.go @@ -27,7 +27,7 @@ import ( ) var ( - middlewareVersion = "0.0.8" + middlewareVersion = "0.0.9" defaultNetworkOptions = &types.NetworkOptionsResponse{ Version: &types.Version{ RosettaVersion: types.RosettaAPIVersion, diff --git a/services/types.go b/services/types.go index 2948b806..1937885d 100644 --- a/services/types.go +++ b/services/types.go @@ -45,7 +45,7 @@ const ( // variable instead of a constant because // we typically need the pointer of this // value. - MiddlewareVersion = "0.0.8" + MiddlewareVersion = "0.0.9" ) // Client is used by the servicers to get Peer information