Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made downloader more resilient to forks upto 500 prime blocks #2192

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
resultCh: make(chan *types.WorkObject, resultQueueSize),
exitCh: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
orderTransactionCh: make(chan transactionOrderingInfo),
fillTransactionsRollingAverage: &RollingAverage{windowSize: 100},
logger: logger,
}
Expand Down Expand Up @@ -260,6 +259,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
worker.wg.Add(1)
go worker.asyncStateLoop()

worker.orderTransactionCh = make(chan transactionOrderingInfo)
worker.wg.Add(1)
go worker.transactionOrderingLoop()
}
Expand Down Expand Up @@ -608,11 +608,13 @@ func (w *worker) GeneratePendingHeader(block *types.WorkObject, fill bool, txs t
work.wo.Header().SetBaseFee(big.NewInt(0))
}

if !fromOrderedTransactionSet {
select {
case w.orderTransactionCh <- transactionOrderingInfo{work.txs, work.gasUsedAfterTransaction, block}:
default:
w.logger.Info("w.orderTranscationCh is full")
if nodeCtx == common.ZONE_CTX && w.hc.ProcessingState() {
if !fromOrderedTransactionSet {
select {
case w.orderTransactionCh <- transactionOrderingInfo{work.txs, work.gasUsedAfterTransaction, block}:
default:
w.logger.Debug("w.orderTranscationCh is full")
}
}
}

Expand Down
133 changes: 80 additions & 53 deletions quai/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package quai

import (
"errors"
"math/big"
"runtime/debug"
"sync"
Expand All @@ -19,19 +20,17 @@ const (
// c_missingBlockChanSize is the size of channel listening to the MissingBlockEvent
c_missingBlockChanSize = 60
// c_checkNextPrimeBlockInterval is the interval for checking the next Block in Prime
c_checkNextPrimeBlockInterval = 60 * time.Second
// c_txsChanSize is the size of channel listening to the new txs event
c_newTxsChanSize = 1000
// c_newWsChanSize is the size of channel listening to the new workobjectshare event
c_newWsChanSize = 10
c_checkNextPrimeBlockInterval = 30 * time.Second
// c_recentBlockReqCache is the size of the cache for the recent block requests
c_recentBlockReqCache = 1000
// c_recentBlockReqTimeout is the timeout for the recent block requests cache
c_recentBlockReqTimeout = 1 * time.Minute
// c_broadcastTransactionsInterval is the interval for broadcasting transactions
c_broadcastTransactionsInterval = 2 * time.Second
// c_maxTxBatchSize is the maximum number of transactions to broadcast at once
c_maxTxBatchSize = 100
// c_primeBlockSyncDepth is how far back the prime block downloading will start
c_primeBlockSyncDepth = 500
)

var (
ErrBlockAlreadyAppended = errors.New("block has already been appended")
)

// handler manages the fetch requests from the core and tx pool also takes care of the tx broadcast
Expand Down Expand Up @@ -156,61 +155,89 @@ func (h *handler) checkNextPrimeBlock() {
for {
select {
case <-checkNextPrimeBlockTimer.C:
Djadih marked this conversation as resolved.
Show resolved Hide resolved
currentHeight := h.core.CurrentHeader().Number(h.nodeLocation.Context())
// To prevent the node from downloading the chains that have been
// forked, every prime block request will require the peer to send
// the next 10 prime blocks this way, we can be sure to add a cost
// for the forked peers/attacking peers
h.GetNextPrimeBlock(currentHeight)

go func() {
defer func() {
if r := recover(); r != nil {
h.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
}()

// Start of the downloading process happens from the tip of the
// prime chain, Going back 10 blocks at a time and checking
// until we reach a point where we already have appended the
// block, then ask the next 20 prime blocks
currentHeight := h.core.CurrentHeader().Number(h.nodeLocation.Context())
syncHeight := new(big.Int).Set(currentHeight)
for i := 0; i < c_primeBlockSyncDepth; i += protocol.C_NumPrimeBlocksToDownload {
h.logger.Info("Downloading prime blocks from syncHeight ", syncHeight)
// the prime block on this try already existed in the database
if err := h.GetNextPrimeBlock(syncHeight); err != nil {
// If i > 2 * protocol.C_NumPrimeBlocksToDownload that
// means the blocks that the node wanted has alreay been
// downloaded otherwise, download next 2 *
// protocol.C_NumPrimeBlocksToDownload
if i < 2*protocol.C_NumPrimeBlocksToDownload {
h.GetNextPrimeBlock(syncHeight)
h.GetNextPrimeBlock(syncHeight.Add(syncHeight, big.NewInt(protocol.C_NumPrimeBlocksToDownload)))
}
break
}
syncHeight.Sub(syncHeight, big.NewInt(protocol.C_NumPrimeBlocksToDownload))
if syncHeight.Sign() == -1 {
break
}
}
}()
case <-h.quitCh:
return
}
}
}

func (h *handler) GetNextPrimeBlock(number *big.Int) {
go func() {
defer func() {
if r := recover(); r != nil {
h.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Fatal("Go-Quai Panicked")
}
}()
// If the blockHash for the asked number is not present in the
// appended database we ask the peer for the block with this hash
resultCh := h.p2pBackend.Request(h.nodeLocation, new(big.Int).Add(number, big.NewInt(1)), []*types.WorkObjectBlockView{})
blocks := <-resultCh
if blocks != nil {
// peer returns a slice of blocks from the requested number
workObjects := blocks.([]*types.WorkObjectBlockView)
if len(workObjects) != protocol.C_NumPrimeBlocksToDownload {
h.logger.Error("did not get expected number of workobjects in prime")
return
func (h *handler) GetNextPrimeBlock(number *big.Int) error {
// If the blockHash for the asked number is not present in the
// appended database we ask the peer for the block with this hash
resultCh := h.p2pBackend.Request(h.nodeLocation, new(big.Int).Add(number, big.NewInt(1)), []*types.WorkObjectBlockView{})
blocks := <-resultCh
if blocks != nil {
// peer returns a slice of blocks from the requested number
workObjects := blocks.([]*types.WorkObjectBlockView)
if len(workObjects) != protocol.C_NumPrimeBlocksToDownload {
h.logger.Error("did not get expected number of workobjects in prime")
return nil
}
var parent *types.WorkObject
for i, wo := range workObjects {
if wo == nil {
h.logger.Error("one of the work objects is nil")
return nil
}
var parent *types.WorkObject
for i, wo := range workObjects {
if wo == nil {
h.logger.Error("one of the work objects is nil")
return
workObject := wo.WorkObject
// Check that all the prime blocks form a continous chain of blocks
if i != 0 {
if workObject.ParentHash(common.PRIME_CTX) != parent.Hash() {
h.logger.Error("downloaded non continous chain of prime blocks")
return nil
}
workObject := wo.WorkObject
// Check that all the prime blocks form a continous chain of blocks
if i != 0 {
if workObject.ParentHash(common.PRIME_CTX) != parent.Hash() {
h.logger.Error("downloaded non continous chain of prime blocks")
return
}
}
parent = workObject
}
parent = workObject
}

// Write all the blocks are the sanity check into the database and
// add it to the append queue
for _, wo := range workObjects {
// Write all the blocks are the sanity check into the database and
// add it to the append queue
for _, wo := range workObjects {
// If the work object is already on chain, return a error back and start the sync from that point
block := h.core.GetBlockByHash(wo.WorkObjectHeader().Hash())
if block != nil {
return ErrBlockAlreadyAppended
} else {
h.core.WriteBlock(wo.WorkObject)
}
}
}()
}
return nil
}
Loading