Skip to content

Commit

Permalink
Implement reorg implementation and trigger reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
atif-konasl committed Dec 8, 2021
1 parent 14ef44f commit e1736fe
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 148 deletions.
89 changes: 34 additions & 55 deletions orchestrator/consensus/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@ import (
"github.com/lukso-network/lukso-orchestrator/shared/types"
)

// processPandoraHeader
// processPandoraHeader method process incoming pandora shard header from pandora chain
// - First it checks the pandora header hash in verified shard info db. If it's already in db then it's already verified, so return nil
// - If it is not in verified db, then this method finds vanguard shard into pending cache.
// - If vanguard shard is already into pending cache, then calls insertIntoChain method to verify the sharding info and
// checks consecutiveness and trigger reorg if vanguard block's parent hash does not match with latest verified slot's hash
func (s *Service) processPandoraHeader(headerInfo *types.PandoraHeaderInfo) error {
slot := headerInfo.Slot

// short circuit check, if this header is already in verified sharding info db then send confirmation instantly
if shardInfo := s.getShardingInfoInDB(slot); shardInfo != nil {
if len(shardInfo.Shards) > 0 {
blocks := shardInfo.Shards[0].Blocks
if len(blocks) > 0 && blocks[0].HeaderRoot == headerInfo.Header.Hash() {
log.WithField("shardInfo", shardInfo).Debug("Pan header is already in verified shard info db")

s.verifiedSlotInfoFeed.Send(&types.SlotInfoWithStatus{
VanguardBlockHash: shardInfo.SlotInfo.BlockRoot,
PandoraHeaderHash: blocks[0].HeaderRoot,
Status: types.Verified,
})

return nil
}
if shardInfo := s.getShardingInfo(slot); shardInfo != nil {
if len(shardInfo.Shards) > 0 && len(shardInfo.Shards[0].Blocks) > 0 && shardInfo.Shards[0].Blocks[0].HeaderRoot == headerInfo.Header.Hash() {
log.WithField("shardInfo", shardInfo).Debug("Pandora shard header is already in verified shard info db")
s.verifiedSlotInfoFeed.Send(&types.SlotInfoWithStatus{
VanguardBlockHash: shardInfo.SlotInfo.BlockRoot,
PandoraHeaderHash: shardInfo.Shards[0].Blocks[0].HeaderRoot,
Status: types.Verified,
})
return nil
}
}

Expand All @@ -45,7 +43,7 @@ func (s *Service) processVanguardShardInfo(vanShardInfo *types.VanguardShardInfo
slot := vanShardInfo.Slot

// short circuit check, if this header is already in verified sharding info db then send confirmation instantly
if shardInfo := s.getShardingInfoInDB(slot); shardInfo != nil {
if shardInfo := s.getShardingInfo(slot); shardInfo != nil {
if shardInfo.SlotInfo != nil && shardInfo.SlotInfo.BlockRoot != common.BytesToHash(vanShardInfo.BlockHash) {
log.WithField("shardInfo", shardInfo).Debug("Van header is already in verified shard info db")
return nil
Expand All @@ -66,7 +64,11 @@ func (s *Service) processVanguardShardInfo(vanShardInfo *types.VanguardShardInfo
// - write into db
// - send status to pandora chain
func (s *Service) insertIntoChain(vanShardInfo *types.VanguardShardInfo, header *eth1Types.Header) error {
status := s.verifyShardingInfo(vanShardInfo, header)
status, err := s.verifyShardingInfo(vanShardInfo, header)
if err != nil {
return err
}

confirmationStatus := &types.SlotInfoWithStatus{
PandoraHeaderHash: header.Hash(),
VanguardBlockHash: common.BytesToHash(vanShardInfo.BlockHash[:]),
Expand Down Expand Up @@ -96,19 +98,26 @@ func (s *Service) insertIntoChain(vanShardInfo *types.VanguardShardInfo, header
// - sharding info between incoming vanguard and pandora sharding infos
// - checks consecutive parent hash of vanguard and pandora sharding hash
// - if parent hash of vanguard block does not match with latest verified slot then trigger reorg
func (s *Service) verifyShardingInfo(vanShardInfo *types.VanguardShardInfo, header *eth1Types.Header) bool {
func (s *Service) verifyShardingInfo(vanShardInfo *types.VanguardShardInfo, header *eth1Types.Header) (bool, error) {
// Here comparing sharding info with vanguard and pandora block's header
if !CompareShardingInfo(header, vanShardInfo.ShardInfo) {
return false
if !compareShardingInfo(header, vanShardInfo.ShardInfo) {
return false, nil
}

// TODO- Checking block consecutive of pandora sharding info and vanguard slot info
// TODO- Trigger reorg if slot consecutive fails
if verificationStatus, triggerReorg := s.verifyConsecutiveHashes(header, vanShardInfo); !verificationStatus {
if triggerReorg {
log.Info("Reorg triggered")
if err := s.processReorg(common.BytesToHash(vanShardInfo.ParentHash)); err != nil {

return false, err
}
}
}

return true
return true, nil
}

func (s *Service) getShardingInfoInDB(slot uint64) *types.MultiShardInfo {
func (s *Service) getShardingInfo(slot uint64) *types.MultiShardInfo {
// Removing slot infos from verified slot info db
stepId, err := s.db.GetStepIdBySlot(slot)
if err != nil {
Expand Down Expand Up @@ -168,33 +177,3 @@ func (s *Service) writeFinalizeInfo(finalizeSlot, finalizeEpoch uint64) {
}
}
}

// reorgDB
func (s *Service) reorgDB(revertSlot uint64) error {
// Removing slot infos from verified slot info db
stepId, err := s.db.GetStepIdBySlot(revertSlot)
if err != nil {
log.WithError(err).WithField("revertSlot", revertSlot).Error("Could not found step id from DB during reorg")
return err
}

shardInfo, err := s.db.VerifiedShardInfo(stepId)
if err != nil {
log.WithError(err).WithField("stepId", stepId).Error("Could not found shard info from DB during reorg")
return err
}

if stepId > 0 && shardInfo != nil {
if err := s.db.RemoveShardingInfos(stepId); err != nil {
log.WithError(err).Error("Could not revert shard info from DB during reorg")
return err
}
}

if err := s.db.SaveLatestStepID(stepId); err != nil {
log.WithError(err).Error("Could not store latest step id during reorg")
return err
}

return nil
}
63 changes: 63 additions & 0 deletions orchestrator/consensus/reorg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package consensus

import (
"github.com/ethereum/go-ethereum/common"
"github.com/lukso-network/lukso-orchestrator/shared/types"
"github.com/pkg/errors"
"sync/atomic"
)

// processReorg
func (s *Service) processReorg(parentVanBlkHash common.Hash) error {
atomic.StoreUint32(&s.reorgInProgress, 1)
defer func() {
atomic.StoreUint32(&s.reorgInProgress, 0)
}()

finalizedSlot := s.db.FinalizedSlot()
finalizedStepId, err := s.db.GetStepIdBySlot(finalizedSlot)
if err != nil {
log.WithError(err).WithField("finalizedSlot", finalizedSlot).WithField("latestFinalizedStepId", finalizedStepId).
Error("Could not found step id from DB")
return err
}

latestStepId := s.db.LatestStepID()
parentShardInfo, err := s.db.FindAncestor(latestStepId, finalizedStepId, parentVanBlkHash)
if err != nil || parentShardInfo == nil {
log.WithField("finalizedSlot", finalizedSlot).WithField("latestFinalizedStepId", finalizedStepId).
Error("Could not found parent shard info from DB")
return errors.Wrap(errUnknownParent, "Failed to process reorg")
}

if len(parentShardInfo.Shards) == 0 || len(parentShardInfo.Shards[0].Blocks) == 0 {
log.WithField("finalizedSlot", finalizedSlot).WithField("latestFinalizedStepId", finalizedStepId).
Error("Invalid length of shards in parent shard info")
return errors.Wrap(errUnknownParent, "Failed to process reorg")
}

if err := s.db.RemoveShardingInfos(finalizedStepId + 1); err != nil {
log.WithError(err).Error("Could not revert shard info from DB during reorg")
return err
}

if err := s.db.SaveLatestStepID(finalizedStepId); err != nil {
log.WithError(err).Error("Could not store latest step id during reorg")
return err
}

// Removing slot infos from vanguard cache and pandora cache
s.vanguardPendingShardingCache.Purge()
s.pandoraPendingHeaderCache.Purge()

reorgInfo := &types.Reorg{
VanParentHash: parentShardInfo.SlotInfo.BlockRoot.Bytes(),
PanParentHash: parentShardInfo.Shards[0].Blocks[0].HeaderRoot.Bytes(),
}

// disconnect subscription
s.pandoraService.Resubscribe()
s.vanguardService.StopSubscription(reorgInfo)

return nil
}
47 changes: 12 additions & 35 deletions orchestrator/consensus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package consensus

import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/event"

Expand All @@ -13,6 +15,10 @@ import (
"github.com/lukso-network/lukso-orchestrator/shared/types"
)

var (
errUnknownParent = errors.New("unknown parent")
)

const (
TotalExecutionShardCount = 1
ShardsPerVanBlock = 1
Expand Down Expand Up @@ -40,7 +46,7 @@ type Service struct {
vanguardService iface.VanguardService
pandoraService iface2.PandoraService
verifiedSlotInfoFeed event.Feed
reorgInProgress bool
reorgInProgress uint32
}

//
Expand Down Expand Up @@ -68,68 +74,39 @@ func (s *Service) Start() {
go func() {
log.Info("Starting consensus service")
vanShardInfoCh := make(chan *types.VanguardShardInfo, 1)
reorgSignalCh := make(chan *types.Reorg, 1)
panHeaderInfoCh := make(chan *types.PandoraHeaderInfo, 1)

vanShardInfoSub := s.vanguardService.SubscribeShardInfoEvent(vanShardInfoCh)
vanShutdownSub := s.vanguardService.SubscribeShutdownSignalEvent(reorgSignalCh)
panHeaderInfoSub := s.pandoraService.SubscribeHeaderInfoEvent(panHeaderInfoCh)

for {
select {
case newPanHeaderInfo := <-panHeaderInfoCh:
if s.reorgInProgress {
if atomic.LoadUint32(&s.reorgInProgress) == 1 {
log.WithField("slot", newPanHeaderInfo.Slot).Info("Reorg is progressing, so skipping new pandora header")
continue
}

if err := s.processPandoraHeader(newPanHeaderInfo); err != nil {
log.WithField("error", err).Error("Error found while processing pandora header")
log.WithField("error", err).Error("Could not process pandora shard info, exiting consensus service")
return
}

case newVanShardInfo := <-vanShardInfoCh:
if s.reorgInProgress {
if atomic.LoadUint32(&s.reorgInProgress) == 1 {
log.WithField("slot", newVanShardInfo.Slot).Info("Reorg is progressing, so skipping new vanguard shard")
continue
}

if err := s.processVanguardShardInfo(newVanShardInfo); err != nil {
log.WithField("error", err).Error("Error found while processing vanguard sharding info")
log.WithField("error", err).Error("Could not process vanguard shard info, exiting consensus service")
return
}

case reorgInfo := <-reorgSignalCh:
if reorgInfo == nil {
log.Error("Received shutdown signal but value not set. So we are doing nothing")
continue
}
s.reorgInProgress = true
// reorg happened. So remove info from database
finalizedSlot := s.db.FinalizedSlot()
finalizedEpoch := s.db.FinalizedEpoch()
log.WithField("curSlot", reorgInfo.NewSlot).WithField("revertSlot", finalizedSlot).
WithField("finalizedEpoch", finalizedEpoch).Warn("Triggered reorg event")

if err := s.reorgDB(finalizedSlot); err != nil {
log.WithError(err).Warn("Failed to revert verified info db, exiting consensus go routine")
return
}
// Removing slot infos from vanguard cache and pandora cache
s.vanguardPendingShardingCache.Purge()
s.pandoraPendingHeaderCache.Purge()

// disconnect subscription
s.pandoraService.Resubscribe()
s.vanguardService.StopSubscription()

s.reorgInProgress = false

case <-s.ctx.Done():
vanShardInfoSub.Unsubscribe()
vanShutdownSub.Unsubscribe()
panHeaderInfoSub.Unsubscribe()
log.Info("Received cancelled context,closing existing consensus service")
log.Info("Received cancelled context, existing consensus service")
return
}
}
Expand Down
19 changes: 9 additions & 10 deletions orchestrator/consensus/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
eth2Types "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)

func CompareShardingInfo(ph *eth1Types.Header, vs *eth2Types.PandoraShard) bool {
func compareShardingInfo(ph *eth1Types.Header, vs *eth2Types.PandoraShard) bool {
if ph == nil && vs == nil {
// in existing code this will happen. as some part may have no sharding info for testing.
return true
Expand Down Expand Up @@ -84,24 +84,23 @@ func CompareShardingInfo(ph *eth1Types.Header, vs *eth2Types.PandoraShard) bool

// verifyConsecutiveHashes method checks parent hash of vanguard and pandora current block header
// Retrieves latest verified slot info and then checks the incoming vanguard and pandora blocks parent hash
func (s *Service) verifyConsecutiveHashes(ph *eth1Types.Header, vs *types.VanguardShardInfo) bool {

func (s *Service) verifyConsecutiveHashes(ph *eth1Types.Header, vs *types.VanguardShardInfo) (verificationStatus, triggerReorg bool) {
latestStepId := s.db.LatestStepID()
// short circuit when latest step id is 0 and 1. For latest first step id, we can not verify consecutiveness
// so returning true here
if latestStepId <= 1 {
return true
return true, false
}

shardInfo, err := s.db.VerifiedShardInfo(latestStepId)
if err != nil {
log.WithError(err).WithField("latestStepId", latestStepId).Error("Could not found shard info from DB")
return false
return false, false
}

if shardInfo == nil {
log.WithField("latestStepId", latestStepId).Debug("Could not found shard info from DB")
return false
return false, false
}

vParentHash := common.BytesToHash(vs.ParentHash)
Expand All @@ -110,14 +109,14 @@ func (s *Service) verifyConsecutiveHashes(ph *eth1Types.Header, vs *types.Vangua
if shardInfo.SlotInfo.BlockRoot != vParentHash {
log.WithField("lastVerifiedVanHash", shardInfo.SlotInfo.BlockRoot).WithField("curVanParentHash", vParentHash).
Debug("Invalid vanguard parent hash")
return false
return false, true
}

if len(shardInfo.Shards) > 0 && len(shardInfo.Shards[0].Blocks) > 0 && shardInfo.Shards[0].Blocks[0].HeaderRoot == pParentHash {
if len(shardInfo.Shards) > 0 && len(shardInfo.Shards[0].Blocks) > 0 && shardInfo.Shards[0].Blocks[0].HeaderRoot == pParentHash {
log.WithField("lastVerifiedPanHash", shardInfo.Shards[0].Blocks[0].HeaderRoot).
WithField("curPanParentHash", pParentHash).Debug("Invalid pandora parent hash")
return false
return false, false
}

return true
return true, false
}
2 changes: 1 addition & 1 deletion orchestrator/consensus/test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (mc *mockFeedService) SubscribeShutdownSignalEvent(signals chan<- *types.Re
return mc.scope.Track(mc.subscriptionShutdownFeed.Subscribe(signals))
}

func (mc *mockFeedService) StopSubscription() {
func (mc *mockFeedService) StopSubscription(reorgInfo *types.Reorg) {
panic("implement me")
}

Expand Down
2 changes: 2 additions & 0 deletions orchestrator/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package iface

import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/lukso-network/lukso-orchestrator/shared/types"
"io"
)
Expand All @@ -28,6 +29,7 @@ type ReadOnlyVerifiedShardInfoDatabase interface {
GetStepIdBySlot(slot uint64) (uint64, error)
FinalizedSlot() uint64
FinalizedEpoch() uint64
FindAncestor(fromStepId, toStepId uint64, blockHash common.Hash) (*types.MultiShardInfo, error)
}

type VerifiedShardInfoDatabase interface {
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/db/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
latestStepId := kv.LatestStepID()

log.WithField("finalizedSlot", finalizedSlot).WithField("finalizedEpoch", finalizedEpoch).
WithField("latestEpoch", latestEpoch).WithField("latestStepId", latestStepId).Info("Initial db")
WithField("latestEpoch", latestEpoch).WithField("latestStepIdKey", latestStepId).Info("Initial db")

return kv, err
}
Expand Down
Loading

0 comments on commit e1736fe

Please sign in to comment.