Skip to content

Commit

Permalink
indexer cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-ogrady committed Dec 9, 2020
1 parent d415efb commit c318263
Showing 1 changed file with 18 additions and 27 deletions.
45 changes: 18 additions & 27 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ const (

// overclockMultiplier is the amount
// we multiply runtime.NumCPU by to determine
// how many concurrent compressions we should
// perform when pre-storing block data.
// how many goroutines we should
// spwan to handle block data sequencing.
overclockMultiplier = 16

// semaphoreWeight is the weight of each semaphore request.
Expand Down Expand Up @@ -110,12 +110,17 @@ type Indexer struct {

waiter *waitTable

// store coins created in encountered before added
// Store coins created in pre-store before persisted
// in add block so we can optimistically populate
// blocks before committed.
coinCache map[string]*types.AccountCoin
coinCacheMutex *sdkUtils.PriorityMutex

encountered int64 // if increases, then retry coin lookup
encounteredMutex sync.Mutex
// When populating blocks using pre-stored blocks,
// we should retry if a new block was seen (similar
// to trying again if head block changes).
seen int64
seenMutex sync.Mutex

seenSemaphore *semaphore.Weighted
}
Expand Down Expand Up @@ -148,11 +153,11 @@ func defaultBadgerOptions(
opts.ValueLogLoadingMode = options.MemoryMap

// Use an extended table size for larger commits.
opts.MaxTableSize = 512 << 20
opts.MaxTableSize = database.DefaultMaxTableSize

// Smaller value log sizes means smaller contiguous memory allocations
// and less RAM usage on cleanup.
opts.ValueLogFileSize = 1024 << 20
opts.ValueLogFileSize = database.DefaultLogValueSize

// To allow writes at a faster speed, we create a new memtable as soon as
// an existing memtable is filled up. This option determines how many
Expand Down Expand Up @@ -343,8 +348,6 @@ func (i *Indexer) Prune(ctx context.Context) error {
func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
logger := utils.ExtractLogger(ctx, "indexer")

start := time.Now()

err := i.blockStorage.AddBlock(ctx, block)
if err != nil {
return fmt.Errorf(
Expand All @@ -355,8 +358,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
)
}

addBlockTime := time.Since(start)

ops := 0
for _, transaction := range block.Transactions {
ops += len(transaction.Operations)
Expand All @@ -379,8 +380,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
}
i.coinCacheMutex.Unlock()

cleanCacheTime := time.Since(start) - addBlockTime

// Look for all remaining waiting transactions associated
// with the next block that have not yet been closed. We should
// abort these waits as they will never be closed by a new transaction.
Expand All @@ -400,18 +399,12 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
}
i.waiter.Unlock()

waiterIteration := time.Since(start) - addBlockTime - cleanCacheTime

logger.Debugw(
"block added",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"transactions", len(block.Transactions),
"ops", ops,
"total add time", time.Since(start),
"add time", addBlockTime,
"clean cache time", cleanCacheTime,
"waiter iteration", waiterIteration,
)

return nil
Expand Down Expand Up @@ -451,9 +444,9 @@ func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error {
i.coinCacheMutex.Unlock()

// Update so that lookers know it exists
i.encounteredMutex.Lock()
i.encountered++
i.encounteredMutex.Unlock()
i.seenMutex.Lock()
i.seen++
i.seenMutex.Unlock()

err := i.blockStorage.SeeBlock(ctx, block)
if err != nil {
Expand Down Expand Up @@ -541,7 +534,7 @@ func (i *Indexer) findCoin(
coinIdentifier string,
) (*types.Coin, *types.AccountIdentifier, error) {
for ctx.Err() == nil {
startEncountered := i.encountered
startSeen := i.seen
databaseTransaction := i.database.ReadTransaction(ctx)
defer databaseTransaction.Discard(ctx)

Expand Down Expand Up @@ -579,9 +572,7 @@ func (i *Indexer) findCoin(
return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier)
}

// Check encounter table
// TODO: check encounter bank of coins to enable full pre-syncing
// otherwise will still be stuck
// Check seen CoinCache
i.coinCacheMutex.Lock(false)
accCoin, ok := i.coinCache[coinIdentifier]
i.coinCacheMutex.Unlock()
Expand All @@ -604,7 +595,7 @@ func (i *Indexer) findCoin(

// If the block has changed, we try to look up the transaction
// again.
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.encountered != startEncountered {
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen {
i.waiter.Unlock()
continue
}
Expand Down

0 comments on commit c318263

Please sign in to comment.