Skip to content

Commit

Permalink
blockdb: bound max deleted blocks per blockdb sync (algorand#5910)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jan 23, 2024
1 parent a12324f commit 3490731
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 3 deletions.
17 changes: 17 additions & 0 deletions ledger/blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func (bq *blockQueue) stop() {
}
}

const maxDeletionBatchSize = 10_000

func (bq *blockQueue) syncer() {
bq.mu.Lock()
for {
Expand Down Expand Up @@ -164,6 +166,21 @@ func (bq *blockQueue) syncer() {
bq.mu.Unlock()

minToSave := bq.l.notifyCommit(committed)
var earliest basics.Round
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
earliest, err0 = blockdb.BlockEarliest(tx)
if err0 != nil {
bq.l.log.Warnf("blockQueue.syncer: BlockEarliest(): %v", err0)
}
return err0
})
if err == nil {
if basics.SubSaturate(minToSave, earliest) > maxDeletionBatchSize {
minToSave = basics.AddSaturate(earliest, maxDeletionBatchSize)
}
}

bfstart := time.Now()
ledgerSyncBlockforgetCount.Inc(nil)
err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
Expand Down
115 changes: 115 additions & 0 deletions ledger/blockqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package ledger

import (
"context"
"database/sql"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -29,10 +32,12 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/blockdb"
ledgertesting "github.com/algorand/go-algorand/ledger/testing"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/db"
)

func randomBlock(r basics.Round) blockEntry {
Expand Down Expand Up @@ -128,3 +133,113 @@ func TestGetEncodedBlockCert(t *testing.T) {
expectedErr := &ledgercore.ErrNoEntry{}
require.True(t, errors.As(err, expectedErr))
}

// it is not great to use trackers here but at the moment there is no abstraction for the ledger
type uptoTracker struct {
emptyTracker
}

// committedUpTo in the emptyTracker just stores the committed round.
func (t *uptoTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
return 5_000, basics.Round(0)
}

// TestBlockQueueSyncerDeletion ensures that the block queue syncer deletes no more than maxDeletionBatchSize blocks at time
func TestBlockQueueSyncerDeletion(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

tests := []struct {
name string
expectedEarliest basics.Round
tracker ledgerTracker
}{
{"max_batch", maxDeletionBatchSize, nil}, // no trackers, max deletion
{"5k_tracker", 5_000, &uptoTracker{}}, // tracker sets minToSave to 5k
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

const dbMem = true
blockDBs, err := db.OpenPair(t.Name()+".block.sqlite", dbMem)
require.NoError(t, err)

log := logging.TestingLog(t)
err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return initBlocksDB(tx, log, []bookkeeping.Block{}, false)
})
require.NoError(t, err)

// add 15k blocks
const maxBlocks = maxDeletionBatchSize + maxDeletionBatchSize/2 // 15_000
err = blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
for i := 0; i < maxBlocks; i++ {
err0 := blockdb.BlockPut(
tx,
bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: basics.Round(i)}},
agreement.Certificate{})
if err0 != nil {
return err0
}
}
return nil
})
require.NoError(t, err)

var earliest, latest basics.Round
err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
earliest, err0 = blockdb.BlockEarliest(tx)
if err0 != nil {
return err0
}
latest, err0 = blockdb.BlockLatest(tx)
return err0
})
require.NoError(t, err)
require.Equal(t, basics.Round(0), earliest)
require.Equal(t, basics.Round(maxBlocks-1), latest)

// trigger deletion and ensure no more than 10k blocks gone
//make a minimal ledger for blockqueue

l := &Ledger{
log: log,
blockDBs: blockDBs,
}
if test.tracker != nil {
l.trackers.trackers = append(l.trackers.trackers, test.tracker)
}
blockq, _ := newBlockQueue(l)
err = blockq.start()
require.NoError(t, err)

// add a block. Eventually the syncer will called on an empty ledger
// forcing deleting all 15_000 rounds. The deletion scoping should limit it to 10_000 rounds instead
err = blockq.putBlock(bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: maxBlocks}}, agreement.Certificate{})
require.NoError(t, err)

require.Eventually(t, func() bool {
var latest basics.Round
err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
latest, err0 = blockdb.BlockLatest(tx)
return err0
})
require.NoError(t, err)
return latest == maxBlocks
}, 1*time.Second, 10*time.Millisecond)

blockq.stop()

err = blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
earliest, err0 = blockdb.BlockEarliest(tx)
return err0
})
require.NoError(t, err)
require.Equal(t, test.expectedEarliest, earliest)
})
}
}
6 changes: 3 additions & 3 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func OpenLedger[T string | DirsAndPrefix](
start := time.Now()
ledgerInitblocksdbCount.Inc(nil)
err = l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return initBlocksDB(tx, l, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival)
return initBlocksDB(tx, l.log, []bookkeeping.Block{genesisInitState.Block}, cfg.Archival)
})
ledgerInitblocksdbMicros.AddMicrosecondsSince(start, nil)
if err != nil {
Expand Down Expand Up @@ -364,7 +364,7 @@ func (l *Ledger) setSynchronousMode(ctx context.Context, synchronousMode db.Sync
// initBlocksDB performs DB initialization:
// - creates and populates it with genesis blocks
// - ensures DB is in good shape for archival mode and resets it if not
func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchival bool) (err error) {
func initBlocksDB(tx *sql.Tx, log logging.Logger, initBlocks []bookkeeping.Block, isArchival bool) (err error) {
err = blockdb.BlockInit(tx, initBlocks)
if err != nil {
err = fmt.Errorf("initBlocksDB.blockInit %v", err)
Expand All @@ -382,7 +382,7 @@ func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchi
// Detect possible problem - archival node needs all block but have only subsequence of them
// So reset the DB and init it again
if earliest != basics.Round(0) {
l.log.Warnf("resetting blocks DB (earliest block is %v)", earliest)
log.Warnf("resetting blocks DB (earliest block is %v)", earliest)
err := blockdb.BlockResetDB(tx)
if err != nil {
err = fmt.Errorf("initBlocksDB.blockResetDB %v", err)
Expand Down
2 changes: 2 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2877,11 +2877,13 @@ func testVotersReloadFromDiskAfterOneStateProofCommitted(t *testing.T, cfg confi
}

triggerDeleteVoters(t, l, genesisInitState)
l.acctsOnline.voters.votersMu.Lock()
vtSnapshot := l.acctsOnline.voters.votersForRoundCache

// verifying that the tree for round 512 is still in the cache, but the tree for round 256 is evicted.
require.Contains(t, vtSnapshot, basics.Round(496))
require.NotContains(t, vtSnapshot, basics.Round(240))
l.acctsOnline.voters.votersMu.Unlock()

err = l.reloadLedger()
require.NoError(t, err)
Expand Down

0 comments on commit 3490731

Please sign in to comment.