diff --git a/indexer/indexer.go b/indexer/indexer.go index dee142b9..9f8bb199 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -64,8 +64,8 @@ const ( // overclockMultiplier is the amount // we multiply runtime.NumCPU by to determine - // how many concurrent compressions we should - // perform when pre-storing block data. + // how many goroutines we should + // spwan to handle block data sequencing. overclockMultiplier = 16 // semaphoreWeight is the weight of each semaphore request. @@ -110,12 +110,17 @@ type Indexer struct { waiter *waitTable - // store coins created in encountered before added + // Store coins created in pre-store before persisted + // in add block so we can optimistically populate + // blocks before committed. coinCache map[string]*types.AccountCoin coinCacheMutex *sdkUtils.PriorityMutex - encountered int64 // if increases, then retry coin lookup - encounteredMutex sync.Mutex + // When populating blocks using pre-stored blocks, + // we should retry if a new block was seen (similar + // to trying again if head block changes). + seen int64 + seenMutex sync.Mutex seenSemaphore *semaphore.Weighted } @@ -148,11 +153,11 @@ func defaultBadgerOptions( opts.ValueLogLoadingMode = options.MemoryMap // Use an extended table size for larger commits. - opts.MaxTableSize = 512 << 20 + opts.MaxTableSize = database.DefaultMaxTableSize // Smaller value log sizes means smaller contiguous memory allocations // and less RAM usage on cleanup. - opts.ValueLogFileSize = 1024 << 20 + opts.ValueLogFileSize = database.DefaultLogValueSize // To allow writes at a faster speed, we create a new memtable as soon as // an existing memtable is filled up. This option determines how many @@ -343,8 +348,6 @@ func (i *Indexer) Prune(ctx context.Context) error { func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { logger := utils.ExtractLogger(ctx, "indexer") - start := time.Now() - err := i.blockStorage.AddBlock(ctx, block) if err != nil { return fmt.Errorf( @@ -355,8 +358,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { ) } - addBlockTime := time.Since(start) - ops := 0 for _, transaction := range block.Transactions { ops += len(transaction.Operations) @@ -379,8 +380,6 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.coinCacheMutex.Unlock() - cleanCacheTime := time.Since(start) - addBlockTime - // Look for all remaining waiting transactions associated // with the next block that have not yet been closed. We should // abort these waits as they will never be closed by a new transaction. @@ -400,18 +399,12 @@ func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { } i.waiter.Unlock() - waiterIteration := time.Since(start) - addBlockTime - cleanCacheTime - logger.Debugw( "block added", "hash", block.BlockIdentifier.Hash, "index", block.BlockIdentifier.Index, "transactions", len(block.Transactions), "ops", ops, - "total add time", time.Since(start), - "add time", addBlockTime, - "clean cache time", cleanCacheTime, - "waiter iteration", waiterIteration, ) return nil @@ -451,9 +444,9 @@ func (i *Indexer) BlockSeen(ctx context.Context, block *types.Block) error { i.coinCacheMutex.Unlock() // Update so that lookers know it exists - i.encounteredMutex.Lock() - i.encountered++ - i.encounteredMutex.Unlock() + i.seenMutex.Lock() + i.seen++ + i.seenMutex.Unlock() err := i.blockStorage.SeeBlock(ctx, block) if err != nil { @@ -541,7 +534,7 @@ func (i *Indexer) findCoin( coinIdentifier string, ) (*types.Coin, *types.AccountIdentifier, error) { for ctx.Err() == nil { - startEncountered := i.encountered + startSeen := i.seen databaseTransaction := i.database.ReadTransaction(ctx) defer databaseTransaction.Discard(ctx) @@ -579,9 +572,7 @@ func (i *Indexer) findCoin( return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier) } - // Check encounter table - // TODO: check encounter bank of coins to enable full pre-syncing - // otherwise will still be stuck + // Check seen CoinCache i.coinCacheMutex.Lock(false) accCoin, ok := i.coinCache[coinIdentifier] i.coinCacheMutex.Unlock() @@ -604,7 +595,7 @@ func (i *Indexer) findCoin( // If the block has changed, we try to look up the transaction // again. - if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.encountered != startEncountered { + if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) || i.seen != startSeen { i.waiter.Unlock() continue }