From b339c1e620d712e7c8049af1f211a5231859004e Mon Sep 17 00:00:00 2001 From: Muhaimin Anando Date: Mon, 1 Nov 2021 20:49:05 +0600 Subject: [PATCH] Feature/reorg (#97) * Implement re-org feature Co-authored-by: meta-bot Co-authored-by: Atif Anowar --- consensus/consensus.go | 3 +++ consensus/pandora/api.go | 4 ++-- consensus/pandora/helpers.go | 5 ++++- consensus/pandora/pandora.go | 27 +++++++++--------------- consensus/pandora/pandora_test.go | 6 ++---- consensus/pandora/sealer.go | 23 ++++++++++---------- consensus/pandora/subscription.go | 35 +++++++++++++++++++++++-------- consensus/pandora/types.go | 7 +++++++ core/chain_makers.go | 1 + 9 files changed, 67 insertions(+), 44 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index e641f436f473..552100afac0a 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -56,6 +56,9 @@ type ChainReader interface { // CurrentBlock retrieves the current head block of the canonical chain. CurrentBlock() *types.Block + + // SetHead for reorg purpose. It will set head to the mentioned block number + SetHead(head uint64) error } // Engine is an algorithm agnostic consensus engine. diff --git a/consensus/pandora/api.go b/consensus/pandora/api.go index 426a590a67c7..e567baf34fc0 100644 --- a/consensus/pandora/api.go +++ b/consensus/pandora/api.go @@ -23,7 +23,7 @@ type API struct { // GetShardingWork returns a work package for external miner. func (api *API) GetShardingWork(parentHash common.Hash, blockNumber uint64, slotNumber uint64, epoch uint64) ([4]string, error) { - log.Debug(">>> GetShardingWork", "parentHash", parentHash, "blockNumber", blockNumber, "slot number", slotNumber, "epoch", epoch) + log.Debug("api: GetShardingWork", "parentHash", parentHash, "blockNumber", blockNumber, "slot number", slotNumber, "epoch", epoch) emptyRes := [4]string{} if api.pandora == nil { return emptyRes, errors.New("pandora engine not supported") @@ -54,7 +54,7 @@ func (api *API) GetShardingWork(parentHash common.Hash, blockNumber uint64, slot // Note either an invalid solution, a stale work a non-existent work will return false. // This submit work contains BLS storing feature. func (api *API) SubmitWorkBLS(nonce types.BlockNonce, hash common.Hash, hexSignatureString string) bool { - log.Trace(">>>>> SubmitworkBLS", "nonce", nonce, "hash", hash, "hexSignatureString", hexSignatureString) + log.Trace("api: SubmitWorkBLS", "nonce", nonce, "hash", hash, "hexSignatureString", hexSignatureString) if api.pandora == nil { return false } diff --git a/consensus/pandora/helpers.go b/consensus/pandora/helpers.go index 579752d71562..ef55461ed7e3 100644 --- a/consensus/pandora/helpers.go +++ b/consensus/pandora/helpers.go @@ -130,7 +130,10 @@ func (p *Pandora) VerifyBLSSignature(header *types.Header) error { curEpochInfo := p.getEpochInfo(extractedEpoch) if curEpochInfo == nil { log.Error("Epoch info not found in cache", "slot", extractedSlot, "epoch", extractedEpoch) - p.epochRequest <- extractedEpoch + if extractedEpoch < p.currentEpoch { + p.requestedEpoch = extractedEpoch + p.subscriptionErrCh <- consensus.ErrEpochNotFound + } return consensus.ErrEpochNotFound } diff --git a/consensus/pandora/pandora.go b/consensus/pandora/pandora.go index 4a79755d5bd2..4d0946db9a12 100644 --- a/consensus/pandora/pandora.go +++ b/consensus/pandora/pandora.go @@ -35,6 +35,7 @@ var ( errInvalidParentHash = errors.New("invalid parent hash") errInvalidBlockNumber = errors.New("invalid block number") errOlderBlockTime = errors.New("timestamp older than parent") + errInvalidBlsSignature = errors.New("Invalid bls signature submitted from validator") ) // DialRPCFn dials to the given endpoint @@ -72,7 +73,6 @@ type Pandora struct { epochInfosMu sync.RWMutex epochInfos *lru.Cache - epochRequest chan uint64 requestedEpoch uint64 } @@ -119,9 +119,8 @@ func New( fetchShardingInfoCh: make(chan *shardingInfoReq), submitShardingInfoCh: make(chan *shardingResult), newSealRequestCh: make(chan *sealTask), - subscriptionErrCh: make(chan error), + subscriptionErrCh: make(chan error, 1), works: make(map[common.Hash]*types.Block), - epochRequest: make(chan uint64), epochInfos: epochCache, // need to define maximum size. It will take maximum latest 100 epochs } } @@ -180,8 +179,8 @@ func (p *Pandora) getCurrentBlock() *types.Block { func (p *Pandora) setCurrentBlock(block *types.Block) { p.currentBlockMu.Lock() + defer p.currentBlockMu.Unlock() p.currentBlock = block - p.currentBlockMu.Unlock() } func (p *Pandora) updateBlockHeader(currentBlock *types.Block, slotNumber uint64, epoch uint64) [4]string { @@ -249,14 +248,6 @@ func (p *Pandora) run(done <-chan struct{}) { // then simply save the block into current block. We will use it again p.setCurrentBlock(sealRequest.block) - case expectedEpoch := <-p.epochRequest: - log.Debug("new epoch info is requested to download from orchestrator", "expected epoch", expectedEpoch, "already received epoch from", p.currentEpoch) - if expectedEpoch < p.currentEpoch { - // expected a previous epoch. so we should download them. we can just unsubscribe them and an error will occur which will auto reconnect again - p.requestedEpoch = expectedEpoch - p.subscription.Unsubscribe() - } - case shardingInfoReq := <-p.fetchShardingInfoCh: // Get sharding work API is called and we got slot number from vanguard currentBlock := p.getCurrentBlock() @@ -293,13 +284,15 @@ func (p *Pandora) run(done <-chan struct{}) { } case submitSignatureData := <-p.submitShardingInfoCh: - if p.submitWork(submitSignatureData.nonce, submitSignatureData.hash, submitSignatureData.blsSeal) { + status, err := p.submitWork(submitSignatureData.nonce, submitSignatureData.hash, submitSignatureData.blsSeal) + if status && err == nil { log.Debug("submitWork is successful", "nonce", submitSignatureData.nonce, "hash", submitSignatureData.hash) submitSignatureData.errc <- nil } else { - log.Debug("submitWork is failed", "nonce", submitSignatureData.nonce, "hash", submitSignatureData.hash, "signature", submitSignatureData.blsSeal, - "current block number", p.getCurrentBlock().NumberU64()) - submitSignatureData.errc <- errors.New("invalid submit work request") + log.Warn("submitWork has failed", "nonce", submitSignatureData.nonce, "hash", + submitSignatureData.hash, "signature", submitSignatureData.blsSeal, + "curBlockNum", p.getCurrentBlock().NumberU64(), "err", err.Error()) + submitSignatureData.errc <- err } case <-ticker.C: @@ -319,7 +312,7 @@ func (p *Pandora) run(done <-chan struct{}) { // TODO- We need a fall-back support to connect with other orchestrator node for verifying incoming blocks when own orchestrator is down // Try to check the connection and retry to establish the connection p.retryToConnectAndSubscribe(err) - continue + case <-done: p.isRunning = false p.runError = nil diff --git a/consensus/pandora/pandora_test.go b/consensus/pandora/pandora_test.go index c2a78e35f9a9..9cb01b0870b8 100644 --- a/consensus/pandora/pandora_test.go +++ b/consensus/pandora/pandora_test.go @@ -275,7 +275,7 @@ func TestPandora_Start(t *testing.T) { ticker.Stop() case err := <-errChannel: assert.NotNil(t, err) - assert.Equal(t, "invalid submit work request", err.Error()) + assert.Equal(t, "Work submitted but none pending", err.Error()) ticker.Stop() } }) @@ -356,12 +356,10 @@ func TestPandora_Start(t *testing.T) { pandoraEngine.endpoint = dummyEndpoint pandoraEngine.Start(nil) time.Sleep(time.Millisecond * 100) - dummyErr := fmt.Errorf("dummyErr") + dummyErr := error(nil) pandoraEngine.subscriptionErrCh <- dummyErr time.Sleep(reConPeriod) assert.Equal(t, dummyErr, pandoraEngine.runError) - time.Sleep(reConPeriod) - assert.NotEqual(t, dummyErr, pandoraEngine.runError) }) t.Run("should handle done event", func(t *testing.T) { diff --git a/consensus/pandora/sealer.go b/consensus/pandora/sealer.go index 31efffd7c659..84c1c625eb42 100644 --- a/consensus/pandora/sealer.go +++ b/consensus/pandora/sealer.go @@ -1,6 +1,7 @@ package pandora import ( + "github.com/pkg/errors" "time" "github.com/ethereum/go-ethereum/common" @@ -22,11 +23,11 @@ func (pan *Pandora) Seal(chain consensus.ChainHeaderReader, block *types.Block, return nil } -func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, blsSignatureBytes *BlsSignatureBytes) bool { +func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, blsSignatureBytes *BlsSignatureBytes) (bool, error) { currentBlock := pan.getCurrentBlock() if currentBlock == nil { log.Error("No block found while submitting work", "sealhash", sealHash) - return false + return false, errors.New("Current sharding block not found") } // Make sure the work submitted is present @@ -34,7 +35,7 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls if block == nil { log.Warn("Work submitted but none pending", "sealHash", sealHash, "blockNumber", currentBlock.NumberU64()) - return false + return false, errors.New("Work submitted but none pending") } // Verify the correctness of submitted result. header := block.Header() @@ -43,7 +44,7 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls if nil != err { log.Error("error while forming signature from bytes", "err", err, "methodName", "Seal", "blockNumber", header.Number) - return false + return false, errors.New("Invalid signature bytes") } pandoraExtraData := new(ExtraData) @@ -51,7 +52,7 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls if nil != err { log.Error("rlp decode failed while converting pandora Extra data", "error", err, "blockNumber", header.Number) - return false + return false, errors.New("RLP decode failed") } extraDataWithSignature.FromExtraDataAndSignature(*pandoraExtraData, blsSignature) @@ -59,7 +60,7 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls if nil != err { log.Error("Invalid extraData in header", "sealHash", sealHash, "err", err, "slot", pandoraExtraData.Slot, "blockNumber", header.Number) - return false + return false, errors.New("Invalid extraData in header") } start := time.Now() @@ -68,13 +69,13 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls log.Warn("Invalid bls signature submitted from validator", "sealHash", sealHash, "elapsed", common.PrettyDuration(time.Since(start)), "err", err, "slot", pandoraExtraData.Slot, "blockNumber", header.Number) - return false + return false, err } // Make sure the result channel is assigned. if pan.results == nil { log.Error("Pandora result channel is empty, submitted mining result is rejected") - return false + return false, errors.New("Pandora result channel is empty, submitted mining result is rejected") } log.Debug("Verified correct sharding info", "sealHash", sealHash, "elapsed", common.PrettyDuration(time.Since(start)), "slot", pandoraExtraData.Slot, "blockNumber", header.Number) @@ -88,14 +89,14 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls case pan.results <- solution: log.Debug("Sharding block submitted is acceptable", "number", solution.NumberU64(), "sealHash", sealHash, "hash", solution.Hash(), "slot", pandoraExtraData.Slot) - return true + return true, nil default: log.Warn("Sealing result is not read by worker", "mode", "remote", "sealHash", sealHash) - return false + return false, errors.New("Sealing result is not read by worker") } } // The submitted block is too old to accept, drop it. log.Warn("Sharding block submitted is too old", "number", solution.NumberU64(), "sealHash", sealHash, "hash", solution.Hash(), "slot", pandoraExtraData.Slot) - return false + return false, errors.New("Sharding block submitted is too old") } diff --git a/consensus/pandora/subscription.go b/consensus/pandora/subscription.go index f09c8f155f5a..9bcaa45660bd 100644 --- a/consensus/pandora/subscription.go +++ b/consensus/pandora/subscription.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" @@ -22,6 +24,7 @@ func (p *Pandora) waitForConnection() { if err = p.connectToOrchestrator(); err == nil { log.Info("Connected and subscribed to orchestrator client", "endpoint", p.endpoint) p.connected = true + p.runError = nil return } log.Warn("Could not connect or subscribe to orchestrator client", "err", err) @@ -115,12 +118,10 @@ func (p *Pandora) subscribe() (*rpc.ClientSubscription, error) { func (p *Pandora) retryToConnectAndSubscribe(err error) { p.runError = err p.connected = false - // Back off for a while before resuming dialing the pandora node. - time.Sleep(reConPeriod) - p.waitForConnection() + go p.waitForConnection() // Reset run error in the event of a successful connection. - p.runError = nil - p.requestedEpoch = 0 + //p.runError = nil + //p.requestedEpoch = 0 } // subscribePendingHeaders subscribes to pandora client from latest saved slot using given rpc client @@ -199,11 +200,27 @@ func (p *Pandora) processEpochInfo(info *EpochInfoPayload) error { epochInfo.ValidatorList[i] = pubKey } - // store epoch info in in-memeory cache - //if err := p.epochInfoCache.put(info.Epoch, epochInfo); err != nil { - // return err - //} p.setEpochInfo(epochInfo.Epoch, epochInfo) + if nil == info.ReorgInfo { + return nil + } + log.Info("reorg event received") + // reorg info is present so reorg is triggered in vanguard side + parentHash := common.BytesToHash(info.ReorgInfo.PanParentHash) + parentBlock := p.chain.GetHeaderByHash(parentHash) + if parentBlock != nil { + // it is an invalid behaviour. Pandora doesn't have the block that should be present + parentBlockNumber := parentBlock.Number.Uint64() + err := p.chain.SetHead(parentBlockNumber) + if err != nil { + log.Error("failed to revert to the mentioned block in reorg", "block number", parentBlockNumber, "block hash", parentHash, "error", err) + return err + } + } + // requested block is not present. Maybe the node is in previous block and received this Epoch. + // but we can just move on. + log.Info("failed to find block for reorg", "requested hash", parentHash) + return nil } diff --git a/consensus/pandora/types.go b/consensus/pandora/types.go index e24808d2162b..b1cc23f7682b 100644 --- a/consensus/pandora/types.go +++ b/consensus/pandora/types.go @@ -39,11 +39,18 @@ type EpochInfo struct { SlotTimeDuration time.Duration } +// Reorg holds reorg related information. Based on this info orchestrator can revert pandora blocks +type Reorg struct { + VanParentHash []byte `json:"van_parent_hash"` + PanParentHash []byte `json:"pan_parent_hash"` +} + type EpochInfoPayload struct { Epoch uint64 `json:"epoch"` // Epoch number ValidatorList [32]string `json:"validatorList"` // Validators public key list for specific epoch EpochTimeStart uint64 `json:"epochTimeStart"` SlotTimeDuration time.Duration `json:"slotTimeDuration"` + ReorgInfo *Reorg `json:"reorg_info"` } // ExtraData diff --git a/core/chain_makers.go b/core/chain_makers.go index 8611be8c089f..20aaf56aa41b 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -314,3 +314,4 @@ func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } func (cr *fakeChainReader) CurrentBlock() *types.Block { return nil } +func (bc *fakeChainReader) SetHead(head uint64) error { return nil }