Skip to content

Commit

Permalink
Feature/reorg (#97)
Browse files Browse the repository at this point in the history
* Implement re-org feature

Co-authored-by: meta-bot <[email protected]>
Co-authored-by: Atif Anowar <[email protected]>
  • Loading branch information
3 people authored Nov 1, 2021
1 parent d674399 commit b339c1e
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 44 deletions.
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type ChainReader interface {

// CurrentBlock retrieves the current head block of the canonical chain.
CurrentBlock() *types.Block

// SetHead for reorg purpose. It will set head to the mentioned block number
SetHead(head uint64) error
}

// Engine is an algorithm agnostic consensus engine.
Expand Down
4 changes: 2 additions & 2 deletions consensus/pandora/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type API struct {

// GetShardingWork returns a work package for external miner.
func (api *API) GetShardingWork(parentHash common.Hash, blockNumber uint64, slotNumber uint64, epoch uint64) ([4]string, error) {
log.Debug(">>> GetShardingWork", "parentHash", parentHash, "blockNumber", blockNumber, "slot number", slotNumber, "epoch", epoch)
log.Debug("api: GetShardingWork", "parentHash", parentHash, "blockNumber", blockNumber, "slot number", slotNumber, "epoch", epoch)
emptyRes := [4]string{}
if api.pandora == nil {
return emptyRes, errors.New("pandora engine not supported")
Expand Down Expand Up @@ -54,7 +54,7 @@ func (api *API) GetShardingWork(parentHash common.Hash, blockNumber uint64, slot
// Note either an invalid solution, a stale work a non-existent work will return false.
// This submit work contains BLS storing feature.
func (api *API) SubmitWorkBLS(nonce types.BlockNonce, hash common.Hash, hexSignatureString string) bool {
log.Trace(">>>>> SubmitworkBLS", "nonce", nonce, "hash", hash, "hexSignatureString", hexSignatureString)
log.Trace("api: SubmitWorkBLS", "nonce", nonce, "hash", hash, "hexSignatureString", hexSignatureString)
if api.pandora == nil {
return false
}
Expand Down
5 changes: 4 additions & 1 deletion consensus/pandora/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func (p *Pandora) VerifyBLSSignature(header *types.Header) error {
curEpochInfo := p.getEpochInfo(extractedEpoch)
if curEpochInfo == nil {
log.Error("Epoch info not found in cache", "slot", extractedSlot, "epoch", extractedEpoch)
p.epochRequest <- extractedEpoch
if extractedEpoch < p.currentEpoch {
p.requestedEpoch = extractedEpoch
p.subscriptionErrCh <- consensus.ErrEpochNotFound
}
return consensus.ErrEpochNotFound
}

Expand Down
27 changes: 10 additions & 17 deletions consensus/pandora/pandora.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
errInvalidParentHash = errors.New("invalid parent hash")
errInvalidBlockNumber = errors.New("invalid block number")
errOlderBlockTime = errors.New("timestamp older than parent")
errInvalidBlsSignature = errors.New("Invalid bls signature submitted from validator")
)

// DialRPCFn dials to the given endpoint
Expand Down Expand Up @@ -72,7 +73,6 @@ type Pandora struct {

epochInfosMu sync.RWMutex
epochInfos *lru.Cache
epochRequest chan uint64
requestedEpoch uint64
}

Expand Down Expand Up @@ -119,9 +119,8 @@ func New(
fetchShardingInfoCh: make(chan *shardingInfoReq),
submitShardingInfoCh: make(chan *shardingResult),
newSealRequestCh: make(chan *sealTask),
subscriptionErrCh: make(chan error),
subscriptionErrCh: make(chan error, 1),
works: make(map[common.Hash]*types.Block),
epochRequest: make(chan uint64),
epochInfos: epochCache, // need to define maximum size. It will take maximum latest 100 epochs
}
}
Expand Down Expand Up @@ -180,8 +179,8 @@ func (p *Pandora) getCurrentBlock() *types.Block {

func (p *Pandora) setCurrentBlock(block *types.Block) {
p.currentBlockMu.Lock()
defer p.currentBlockMu.Unlock()
p.currentBlock = block
p.currentBlockMu.Unlock()
}

func (p *Pandora) updateBlockHeader(currentBlock *types.Block, slotNumber uint64, epoch uint64) [4]string {
Expand Down Expand Up @@ -249,14 +248,6 @@ func (p *Pandora) run(done <-chan struct{}) {
// then simply save the block into current block. We will use it again
p.setCurrentBlock(sealRequest.block)

case expectedEpoch := <-p.epochRequest:
log.Debug("new epoch info is requested to download from orchestrator", "expected epoch", expectedEpoch, "already received epoch from", p.currentEpoch)
if expectedEpoch < p.currentEpoch {
// expected a previous epoch. so we should download them. we can just unsubscribe them and an error will occur which will auto reconnect again
p.requestedEpoch = expectedEpoch
p.subscription.Unsubscribe()
}

case shardingInfoReq := <-p.fetchShardingInfoCh:
// Get sharding work API is called and we got slot number from vanguard
currentBlock := p.getCurrentBlock()
Expand Down Expand Up @@ -293,13 +284,15 @@ func (p *Pandora) run(done <-chan struct{}) {
}

case submitSignatureData := <-p.submitShardingInfoCh:
if p.submitWork(submitSignatureData.nonce, submitSignatureData.hash, submitSignatureData.blsSeal) {
status, err := p.submitWork(submitSignatureData.nonce, submitSignatureData.hash, submitSignatureData.blsSeal)
if status && err == nil {
log.Debug("submitWork is successful", "nonce", submitSignatureData.nonce, "hash", submitSignatureData.hash)
submitSignatureData.errc <- nil
} else {
log.Debug("submitWork is failed", "nonce", submitSignatureData.nonce, "hash", submitSignatureData.hash, "signature", submitSignatureData.blsSeal,
"current block number", p.getCurrentBlock().NumberU64())
submitSignatureData.errc <- errors.New("invalid submit work request")
log.Warn("submitWork has failed", "nonce", submitSignatureData.nonce, "hash",
submitSignatureData.hash, "signature", submitSignatureData.blsSeal,
"curBlockNum", p.getCurrentBlock().NumberU64(), "err", err.Error())
submitSignatureData.errc <- err
}

case <-ticker.C:
Expand All @@ -319,7 +312,7 @@ func (p *Pandora) run(done <-chan struct{}) {
// TODO- We need a fall-back support to connect with other orchestrator node for verifying incoming blocks when own orchestrator is down
// Try to check the connection and retry to establish the connection
p.retryToConnectAndSubscribe(err)
continue

case <-done:
p.isRunning = false
p.runError = nil
Expand Down
6 changes: 2 additions & 4 deletions consensus/pandora/pandora_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestPandora_Start(t *testing.T) {
ticker.Stop()
case err := <-errChannel:
assert.NotNil(t, err)
assert.Equal(t, "invalid submit work request", err.Error())
assert.Equal(t, "Work submitted but none pending", err.Error())
ticker.Stop()
}
})
Expand Down Expand Up @@ -356,12 +356,10 @@ func TestPandora_Start(t *testing.T) {
pandoraEngine.endpoint = dummyEndpoint
pandoraEngine.Start(nil)
time.Sleep(time.Millisecond * 100)
dummyErr := fmt.Errorf("dummyErr")
dummyErr := error(nil)
pandoraEngine.subscriptionErrCh <- dummyErr
time.Sleep(reConPeriod)
assert.Equal(t, dummyErr, pandoraEngine.runError)
time.Sleep(reConPeriod)
assert.NotEqual(t, dummyErr, pandoraEngine.runError)
})

t.Run("should handle done event", func(t *testing.T) {
Expand Down
23 changes: 12 additions & 11 deletions consensus/pandora/sealer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pandora

import (
"github.com/pkg/errors"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,19 +23,19 @@ func (pan *Pandora) Seal(chain consensus.ChainHeaderReader, block *types.Block,
return nil
}

func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, blsSignatureBytes *BlsSignatureBytes) bool {
func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, blsSignatureBytes *BlsSignatureBytes) (bool, error) {
currentBlock := pan.getCurrentBlock()
if currentBlock == nil {
log.Error("No block found while submitting work", "sealhash", sealHash)
return false
return false, errors.New("Current sharding block not found")
}

// Make sure the work submitted is present
block := pan.works[sealHash]
if block == nil {
log.Warn("Work submitted but none pending", "sealHash", sealHash,
"blockNumber", currentBlock.NumberU64())
return false
return false, errors.New("Work submitted but none pending")
}
// Verify the correctness of submitted result.
header := block.Header()
Expand All @@ -43,23 +44,23 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls
if nil != err {
log.Error("error while forming signature from bytes", "err", err,
"methodName", "Seal", "blockNumber", header.Number)
return false
return false, errors.New("Invalid signature bytes")
}

pandoraExtraData := new(ExtraData)
err = rlp.DecodeBytes(header.Extra, pandoraExtraData)
if nil != err {
log.Error("rlp decode failed while converting pandora Extra data", "error", err,
"blockNumber", header.Number)
return false
return false, errors.New("RLP decode failed")
}

extraDataWithSignature.FromExtraDataAndSignature(*pandoraExtraData, blsSignature)
header.Extra, err = rlp.EncodeToBytes(extraDataWithSignature)
if nil != err {
log.Error("Invalid extraData in header", "sealHash", sealHash, "err", err,
"slot", pandoraExtraData.Slot, "blockNumber", header.Number)
return false
return false, errors.New("Invalid extraData in header")
}

start := time.Now()
Expand All @@ -68,13 +69,13 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls
log.Warn("Invalid bls signature submitted from validator",
"sealHash", sealHash, "elapsed", common.PrettyDuration(time.Since(start)),
"err", err, "slot", pandoraExtraData.Slot, "blockNumber", header.Number)
return false
return false, err
}

// Make sure the result channel is assigned.
if pan.results == nil {
log.Error("Pandora result channel is empty, submitted mining result is rejected")
return false
return false, errors.New("Pandora result channel is empty, submitted mining result is rejected")
}
log.Debug("Verified correct sharding info", "sealHash", sealHash,
"elapsed", common.PrettyDuration(time.Since(start)), "slot", pandoraExtraData.Slot, "blockNumber", header.Number)
Expand All @@ -88,14 +89,14 @@ func (pan *Pandora) submitWork(nonce types.BlockNonce, sealHash common.Hash, bls
case pan.results <- solution:
log.Debug("Sharding block submitted is acceptable", "number", solution.NumberU64(),
"sealHash", sealHash, "hash", solution.Hash(), "slot", pandoraExtraData.Slot)
return true
return true, nil
default:
log.Warn("Sealing result is not read by worker", "mode", "remote", "sealHash", sealHash)
return false
return false, errors.New("Sealing result is not read by worker")
}
}
// The submitted block is too old to accept, drop it.
log.Warn("Sharding block submitted is too old", "number", solution.NumberU64(),
"sealHash", sealHash, "hash", solution.Hash(), "slot", pandoraExtraData.Slot)
return false
return false, errors.New("Sharding block submitted is too old")
}
35 changes: 26 additions & 9 deletions consensus/pandora/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -22,6 +24,7 @@ func (p *Pandora) waitForConnection() {
if err = p.connectToOrchestrator(); err == nil {
log.Info("Connected and subscribed to orchestrator client", "endpoint", p.endpoint)
p.connected = true
p.runError = nil
return
}
log.Warn("Could not connect or subscribe to orchestrator client", "err", err)
Expand Down Expand Up @@ -115,12 +118,10 @@ func (p *Pandora) subscribe() (*rpc.ClientSubscription, error) {
func (p *Pandora) retryToConnectAndSubscribe(err error) {
p.runError = err
p.connected = false
// Back off for a while before resuming dialing the pandora node.
time.Sleep(reConPeriod)
p.waitForConnection()
go p.waitForConnection()
// Reset run error in the event of a successful connection.
p.runError = nil
p.requestedEpoch = 0
//p.runError = nil
//p.requestedEpoch = 0
}

// subscribePendingHeaders subscribes to pandora client from latest saved slot using given rpc client
Expand Down Expand Up @@ -199,11 +200,27 @@ func (p *Pandora) processEpochInfo(info *EpochInfoPayload) error {
epochInfo.ValidatorList[i] = pubKey
}

// store epoch info in in-memeory cache
//if err := p.epochInfoCache.put(info.Epoch, epochInfo); err != nil {
// return err
//}
p.setEpochInfo(epochInfo.Epoch, epochInfo)

if nil == info.ReorgInfo {
return nil
}
log.Info("reorg event received")
// reorg info is present so reorg is triggered in vanguard side
parentHash := common.BytesToHash(info.ReorgInfo.PanParentHash)
parentBlock := p.chain.GetHeaderByHash(parentHash)
if parentBlock != nil {
// it is an invalid behaviour. Pandora doesn't have the block that should be present
parentBlockNumber := parentBlock.Number.Uint64()
err := p.chain.SetHead(parentBlockNumber)
if err != nil {
log.Error("failed to revert to the mentioned block in reorg", "block number", parentBlockNumber, "block hash", parentHash, "error", err)
return err
}
}
// requested block is not present. Maybe the node is in previous block and received this Epoch.
// but we can just move on.
log.Info("failed to find block for reorg", "requested hash", parentHash)

return nil
}
7 changes: 7 additions & 0 deletions consensus/pandora/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ type EpochInfo struct {
SlotTimeDuration time.Duration
}

// Reorg holds reorg related information. Based on this info orchestrator can revert pandora blocks
type Reorg struct {
VanParentHash []byte `json:"van_parent_hash"`
PanParentHash []byte `json:"pan_parent_hash"`
}

type EpochInfoPayload struct {
Epoch uint64 `json:"epoch"` // Epoch number
ValidatorList [32]string `json:"validatorList"` // Validators public key list for specific epoch
EpochTimeStart uint64 `json:"epochTimeStart"`
SlotTimeDuration time.Duration `json:"slotTimeDuration"`
ReorgInfo *Reorg `json:"reorg_info"`
}

// ExtraData
Expand Down
1 change: 1 addition & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,4 @@ func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header
func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil }
func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil }
func (cr *fakeChainReader) CurrentBlock() *types.Block { return nil }
func (bc *fakeChainReader) SetHead(head uint64) error { return nil }

0 comments on commit b339c1e

Please sign in to comment.