Skip to content

Commit

Permalink
Merge pull request coinbase#53 from coinbase/patrick/encounter-optimi…
Browse files Browse the repository at this point in the history
…zation

[indexer] Syncing Optimizations
  • Loading branch information
patrick-ogrady authored Dec 9, 2020
2 parents 7b5393d + 8eaa063 commit 1b4c984
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.13
require (
github.com/btcsuite/btcd v0.21.0-beta
github.com/btcsuite/btcutil v1.0.2
github.com/coinbase/rosetta-sdk-go v0.6.3
github.com/coinbase/rosetta-sdk-go v0.6.5
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/stretchr/testify v1.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coinbase/rosetta-sdk-go v0.6.3 h1:PPj14tPJ7SFc8sY/hlwK8zddT7PKwWU2wicxyerDxlg=
github.com/coinbase/rosetta-sdk-go v0.6.3/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo=
github.com/coinbase/rosetta-sdk-go v0.6.5 h1:RytFDCPXS64vEYwIOsxsoQGlZZyP9RQvzyYikxymI4w=
github.com/coinbase/rosetta-sdk-go v0.6.5/go.mod h1:MvQfsL2KlJ5786OdDviRIJE3agui2YcvS1CaQPDl1Yo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
180 changes: 150 additions & 30 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"time"

"github.com/coinbase/rosetta-bitcoin/bitcoin"
Expand All @@ -34,6 +36,7 @@ import (
sdkUtils "github.com/coinbase/rosetta-sdk-go/utils"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -58,6 +61,15 @@ const (

// zeroValue is 0 as a string
zeroValue = "0"

// overclockMultiplier is the amount
// we multiply runtime.NumCPU by to determine
// how many goroutines we should
// spwan to handle block data sequencing.
overclockMultiplier = 16

// semaphoreWeight is the weight of each semaphore request.
semaphoreWeight = int64(1)
)

var (
Expand Down Expand Up @@ -97,6 +109,20 @@ type Indexer struct {
workers []modules.BlockWorker

waiter *waitTable

// Store coins created in pre-store before persisted
// in add block so we can optimistically populate
// blocks before committed.
coinCache map[string]*types.AccountCoin
coinCacheMutex *sdkUtils.PriorityMutex

// When populating blocks using pre-stored blocks,
// we should retry if a new block was seen (similar
// to trying again if head block changes).
seen int64
seenMutex sync.Mutex

seenSemaphore *semaphore.Weighted
}

// CloseDatabase closes a storage.Database. This should be called
Expand Down Expand Up @@ -178,7 +204,7 @@ func Initialize(
return nil, fmt.Errorf("%w: unable to initialize storage", err)
}

blockStorage := modules.NewBlockStorage(localStore)
blockStorage := modules.NewBlockStorage(localStore, runtime.NumCPU()*overclockMultiplier)
asserter, err := asserter.NewClientWithOptions(
config.Network,
config.GenesisBlockIdentifier,
Expand All @@ -192,14 +218,17 @@ func Initialize(
}

i := &Indexer{
cancel: cancel,
network: config.Network,
pruningConfig: config.Pruning,
client: client,
database: localStore,
blockStorage: blockStorage,
waiter: newWaitTable(),
asserter: asserter,
cancel: cancel,
network: config.Network,
pruningConfig: config.Pruning,
client: client,
database: localStore,
blockStorage: blockStorage,
waiter: newWaitTable(),
asserter: asserter,
coinCache: map[string]*types.AccountCoin{},
coinCacheMutex: new(sdkUtils.PriorityMutex),
seenSemaphore: semaphore.NewWeighted(int64(runtime.NumCPU())),
}

coinStorage := modules.NewCoinStorage(
Expand Down Expand Up @@ -330,35 +359,31 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
}

ops := 0

// Close channels of all blocks waiting.
i.waiter.Lock()
for _, transaction := range block.Transactions {
ops += len(transaction.Operations)
txHash := transaction.TransactionIdentifier.Hash
val, ok := i.waiter.Get(txHash, false)
if !ok {
continue
}
}

if val.channelClosed {
logger.Debugw(
"channel already closed",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"channel", txHash,
)
continue
}
// clean cache intermediate
i.coinCacheMutex.Lock(true)
for _, tx := range block.Transactions {
for _, op := range tx.Operations {
if op.CoinChange == nil {
continue
}

// Closing channel will cause all listeners to continue
val.channelClosed = true
close(val.channel)
if op.CoinChange.CoinAction != types.CoinCreated {
continue
}

delete(i.coinCache, op.CoinChange.CoinIdentifier.Identifier)
}
}
i.coinCacheMutex.Unlock()

// Look for all remaining waiting transactions associated
// with the next block that have not yet been closed. We should
// abort these waits as they will never be closed by a new transaction.
i.waiter.Lock()
for txHash, val := range i.waiter.table {
if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed {
logger.Debugw(
Expand All @@ -385,6 +410,91 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
return nil
}

// BlockSeen is called by the syncer when a block is encountered.
func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error {
if err := i.seenSemaphore.Acquire(ctx, semaphoreWeight); err != nil {
return err
}
defer i.seenSemaphore.Release(semaphoreWeight)

logger := utils.ExtractLogger(ctx, "indexer")

// load intermediate
i.coinCacheMutex.Lock(false)
for _, tx := range block.Transactions {
for _, op := range tx.Operations {
if op.CoinChange == nil {
continue
}

// We only care about newly accessible coins.
if op.CoinChange.CoinAction != types.CoinCreated {
continue
}

i.coinCache[op.CoinChange.CoinIdentifier.Identifier] = &types.AccountCoin{
Account: op.Account,
Coin: &types.Coin{
CoinIdentifier: op.CoinChange.CoinIdentifier,
Amount: op.Amount,
},
}
}
}
i.coinCacheMutex.Unlock()

// Update so that lookers know it exists
i.seenMutex.Lock()
i.seen++
i.seenMutex.Unlock()

err := i.blockStorage.SeeBlock(ctx, block)
if err != nil {
return fmt.Errorf(
"%w: unable to encounter block to storage %s:%d",
err,
block.BlockIdentifier.Hash,
block.BlockIdentifier.Index,
)
}

ops := 0

// Close channels of all blocks waiting.
i.waiter.Lock()
for _, transaction := range block.Transactions {
ops += len(transaction.Operations)
txHash := transaction.TransactionIdentifier.Hash
val, ok := i.waiter.Get(txHash, false)
if !ok {
continue
}

if val.channelClosed {
logger.Debugw(
"channel already closed",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"channel", txHash,
)
continue
}

// Closing channel will cause all listeners to continue
val.channelClosed = true
close(val.channel)
}
i.waiter.Unlock()

logger.Debugw(
"block seen",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
)

return nil
}

// BlockRemoved is called by the syncer when a block is removed.
func (i *Indexer) BlockRemoved(
ctx context.Context,
Expand Down Expand Up @@ -424,6 +534,7 @@ func (i *Indexer) findCoin(
coinIdentifier string,
) (*types.Coin, *types.AccountIdentifier, error) {
for ctx.Err() == nil {
startSeen := i.seen
databaseTransaction := i.database.ReadTransaction(ctx)
defer databaseTransaction.Discard(ctx)

Expand Down Expand Up @@ -461,6 +572,14 @@ func (i *Indexer) findCoin(
return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier)
}

// Check seen CoinCache
i.coinCacheMutex.Lock(false)
accCoin, ok := i.coinCache[coinIdentifier]
i.coinCacheMutex.Unlock()
if ok {
return accCoin.Coin, accCoin.Account, nil
}

// Locking here prevents us from adding sending any done
// signals while we are determining whether or not to add
// to the WaitTable.
Expand All @@ -470,12 +589,13 @@ func (i *Indexer) findCoin(
// we created our databaseTransaction.
currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil {
i.waiter.Unlock()
return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err)
}

// If the block has changed, we try to look up the transaction
// again.
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) {
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen {
i.waiter.Unlock()
continue
}
Expand Down
2 changes: 1 addition & 1 deletion services/network_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

var (
middlewareVersion = "0.0.8"
middlewareVersion = "0.0.9"
defaultNetworkOptions = &types.NetworkOptionsResponse{
Version: &types.Version{
RosettaVersion: types.RosettaAPIVersion,
Expand Down
2 changes: 1 addition & 1 deletion services/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// variable instead of a constant because
// we typically need the pointer of this
// value.
MiddlewareVersion = "0.0.8"
MiddlewareVersion = "0.0.9"
)

// Client is used by the servicers to get Peer information
Expand Down

0 comments on commit 1b4c984

Please sign in to comment.