Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Implementation of Snap sync #95

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Core struct {

syncTarget *types.Header // sync target header decided based on Best Prime Block as the target to sync to

snapSyncFeed event.Feed // Snap sync feed

normalListBackoff uint64 // normalListBackoff is the multiple on c_normalListProcCounter which delays the proc on normal list

quit chan struct{} // core quit channel
Expand Down Expand Up @@ -394,7 +396,7 @@ func (c *Core) addToQueueIfNotAppended(block *types.Block) {

// SetSyncTarget sets the sync target entropy based on the prime blocks
func (c *Core) SetSyncTarget(header *types.Header) {
if c.sl.subClients == nil || header.Hash() == c.sl.config.GenesisHash {
if c.sl.subClients == nil || header.Hash() == c.sl.config.GenesisHash || header == nil {
return
}

Expand All @@ -408,10 +410,8 @@ func (c *Core) SetSyncTarget(header *types.Header) {
nodeCtx := c.NodeLocation().Context()
// Set Sync Target for subs
if nodeCtx != common.ZONE_CTX {
if header != nil {
if c.sl.subClients[header.Location().SubIndex(nodeLocation)] != nil {
c.sl.subClients[header.Location().SubIndex(nodeLocation)].SetSyncTarget(context.Background(), header)
}
if subClient := c.sl.subClients[header.Location().SubIndex(nodeLocation)]; subClient != nil {
subClient.SetSyncTarget(context.Background(), header)
Copy link
Member

@wizeguyy wizeguyy Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this used? We need to tell the node which block to snap to, but this seems to tell the node which node to sync to.

e.g. if we have the following chain:

[block 200]<--[block 201]<--...[block 10000]<--[block 10001]<--[block 10002]
  ^
zone node is here

Region should use this method should tell the zone to sync to 10002, but when snap syncing we want to tell the node to start with a snapshot at 800 or something (some number of prime blocks before the tip).

Am I misunderstanding this method?

Copy link
Member

@wizeguyy wizeguyy Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to ensure is that all nodes in a slice snapshot to the same prime block. They cannot decide on independent snap targets

}
}
if c.syncTarget == nil || c.syncTarget.ParentEntropy(nodeCtx).Cmp(header.ParentEntropy(nodeCtx)) < 0 {
Expand All @@ -433,6 +433,17 @@ func (c *Core) SyncTargetEntropy() (*big.Int, *big.Int) {
}
}

// TriggerSnapSync triggers snap sync at the zone level
func (c *Core) TriggerSnapSync(header *types.Header) {
if nodeCtx := c.NodeLocation().Context(); nodeCtx != common.ZONE_CTX {
for _, subClient := range c.sl.subClients {
subClient.TriggerSnapSync(context.Background(), header)
}
} else {
c.triggerSnapSyncStart(header.Number(c.sl.NodeCtx()).Uint64())
}
}

// addToAppendQueue adds a block to the append queue
func (c *Core) addToAppendQueue(block *types.Block) error {
nodeCtx := c.NodeLocation().Context()
Expand Down Expand Up @@ -625,10 +636,17 @@ func (c *Core) WriteBlock(block *types.Block) {
if nodeCtx == common.PRIME_CTX {
if block != nil {
c.SetSyncTarget(block.Header())
if c.shouldStartSnapSync(block) {
c.TriggerSnapSync(block.Header())
}
}
}
}

func (c *Core) shouldStartSnapSync(block *types.Block) bool {
panic("TODO: implement")
}

func (c *Core) Append(header *types.Header, manifest types.BlockManifest, domPendingHeader *types.Header, domTerminus common.Hash, domOrigin bool, newInboundEtxs types.Transactions) (types.Transactions, bool, bool, error) {
nodeCtx := c.NodeCtx()
newPendingEtxs, subReorg, setHead, err := c.sl.Append(header, domPendingHeader, domTerminus, domOrigin, newInboundEtxs)
Expand Down Expand Up @@ -960,6 +978,16 @@ func (c *Core) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription
return c.sl.hc.bc.SubscribeBlockProcessingEvent(ch)
}

// SubscribeSnapSyncStartEvent returns a subscription to snap sync start events.
func (c *Core) SubscribeSnapSyncStartEvent(ch chan<- SnapSyncStartEvent) event.Subscription {
return c.snapSyncFeed.Subscribe(ch)
}

// triggerSnapSyncStart sends a snap sync start event to all subscribers.
func (c *Core) triggerSnapSyncStart(blockNumber uint64) {
c.snapSyncFeed.Send(SnapSyncStartEvent{BlockNumber: blockNumber})
}

// Export writes the active chain to the given writer.
func (c *Core) Export(w io.Writer) error {
return c.sl.hc.Export(w)
Expand Down
5 changes: 5 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,8 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

// SnapSyncStartEvent represents a request to start the snap sync process.
type SnapSyncStartEvent struct {
BlockNumber uint64 // The block number from which to start snap syncing
}
1 change: 1 addition & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Backend interface {
GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error)
GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error)
SetSyncTarget(header *types.Header)
TriggerSnapSync(header *types.Header)
ProcessingState() bool
GetSlicesRunning() []common.Location

Expand Down
9 changes: 9 additions & 0 deletions internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,15 @@ func (s *PublicBlockChainQuaiAPI) SetSyncTarget(ctx context.Context, raw json.Ra
return nil
}

func (s *PublicBlockChainQuaiAPI) TriggerSnapSync(ctx context.Context, raw json.RawMessage) error {
var header *types.Header
if err := json.Unmarshal(raw, &header); err != nil {
return err
}
s.b.TriggerSnapSync(header)
return nil
}

// ListRunningChains returns the running locations where the node is serving data.
func (s *PublicBlockChainQuaiAPI) ListRunningChains() []common.Location {
return s.b.GetSlicesRunning()
Expand Down
3 changes: 1 addition & 2 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/dominant-strategies/go-quai/p2p"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/quai"
"github.com/dominant-strategies/go-quai/trie"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -252,7 +251,7 @@ func (p *P2PNode) GetHeader(hash common.Hash, location common.Location) *types.H
panic("TODO: implement")
}

func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse {
func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) ([]byte, error) {
return p.consensus.GetTrieNode(hash, location)
}

Expand Down
35 changes: 22 additions & 13 deletions p2p/protocol/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package protocol

import (
"errors"
"io"
"math/big"

Expand All @@ -12,6 +11,7 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p/pb"
"github.com/dominant-strategies/go-quai/trie"
"github.com/pkg/errors"
)

// QuaiProtocolHandler handles all the incoming requests and responds with corresponding data
Expand Down Expand Up @@ -69,19 +69,19 @@ func handleRequest(quaiMsg *pb.QuaiRequestMessage, stream network.Stream, node Q
switch query.(type) {
case *common.Hash:
log.Global.WithFields(log.Fields{
"requestID": id,
"requestID": id,
"decodedType": decodedType,
"location": loc,
"hash": query,
"peer": stream.Conn().RemotePeer(),
"location": loc,
"hash": query,
"peer": stream.Conn().RemotePeer(),
}).Debug("Received request by hash to handle")
case *big.Int:
log.Global.WithFields(log.Fields{
"requestID": id,
"requestID": id,
"decodedType": decodedType,
"location": loc,
"hash": query,
"peer": stream.Conn().RemotePeer(),
"location": loc,
"hash": query,
"peer": stream.Conn().RemotePeer(),
}).Debug("Received request by number to handle")
default:
log.Global.Errorf("unsupported request input data field type: %T", query)
Expand Down Expand Up @@ -228,13 +228,22 @@ func handleBlockNumberRequest(id uint32, loc common.Location, number *big.Int, s
}

func handleTrieNodeRequest(id uint32, loc common.Location, hash common.Hash, stream network.Stream, node QuaiP2PNode) error {
trieNode := node.GetTrieNode(hash, loc)
trieNode, err := node.GetTrieNode(hash, loc)
if err != nil {
return errors.Wrapf(err, "error getting trie node for hash %s", hash)
}

if trieNode == nil {
log.Global.Tracef("trie node not found")
return nil
return errors.Errorf("trie node not found for hash %s", hash)
}

trieNodeResp := &trie.TrieNodeResponse{
NodeHash: hash,
NodeData: trieNode,
}

log.Global.Tracef("trie node found")
data, err := pb.EncodeQuaiResponse(id, loc, trieNode)
data, err := pb.EncodeQuaiResponse(id, loc, trieNodeResp)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions p2p/protocol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"math/big"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/p2p/requestManager"
"github.com/dominant-strategies/go-quai/trie"
)

// interface required to join the quai protocol network
Expand All @@ -21,7 +20,7 @@ type QuaiP2PNode interface {
GetBlock(hash common.Hash, location common.Location) *types.Block
GetHeader(hash common.Hash, location common.Location) *types.Header
GetBlockHashByNumber(number *big.Int, location common.Location) *common.Hash
GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse
GetTrieNode(hash common.Hash, location common.Location) ([]byte, error)
GetRequestManager() requestManager.RequestManager
GetHostBackend() host.Host

Expand Down
4 changes: 4 additions & 0 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ func (b *QuaiAPIBackend) SetSyncTarget(header *types.Header) {
b.quai.core.SetSyncTarget(header)
}

func (b *QuaiAPIBackend) TriggerSnapSync(header *types.Header) {
b.quai.core.TriggerSnapSync(header)
}

func (b *QuaiAPIBackend) Logger() *log.Logger {
return b.quai.logger
}
Expand Down
2 changes: 1 addition & 1 deletion quai/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func New(stack *node.Node, p2p NetworkingAPI, config *quaiconfig.Config, nodeCtx
quai.p2p.Subscribe(config.NodeLocation, common.Hash{})
quai.p2p.Subscribe(config.NodeLocation, &types.Transaction{})

quai.handler = newHandler(quai.p2p, quai.core, config.NodeLocation)
quai.handler = newHandler(quai.p2p, quai.core, config.NodeLocation, chainDb)
// Start the handler
quai.handler.Start()

Expand Down
50 changes: 50 additions & 0 deletions quai/downloader/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package downloader

import (
"sync"

"bytes"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/crypto"
"github.com/dominant-strategies/go-quai/ethdb"
"github.com/pkg/errors"
)

type Downloader struct {
p2pNode P2PNode
f *fetcher
}

func NewDownloader(p2pNode P2PNode, chainDb ethdb.Database, quitCh chan struct{}) *Downloader {
f := &fetcher{
p2pNode: p2pNode,
db: chainDb,
mu: sync.Mutex{},
quitCh: quitCh,
}
return &Downloader{
p2pNode: p2pNode,
f: f,
}
}

func (d *Downloader) StartSnapSync(loc common.Location, blockNumber uint64) error {
block, err := d.f.fetchBlock(loc, blockNumber)
if err != nil {
return errors.Errorf("failed to fetch block %d: %v", blockNumber, err)
}

err = d.f.fetchStateTrie(block.Hash(), block.Header().EVMRoot())
if err != nil {
return errors.Wrap(err, "failed to fetch state trie")
}

return nil
}

// VerifyNodeHash verifies a expected hash against the RLP encoding of the received trie node
func verifyNodeHash(rlpBytes []byte, expectedHash []byte) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed yesterday, I think this can be handled entirely by libp2p, since it hashes every packet it receives. Please leverage that

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have @gameofpointers view on this as well. Since the above was agreed with him

hash := crypto.Keccak256(rlpBytes)
return bytes.Equal(hash, expectedHash)
}
Loading
Loading