From 3a23c8048549262ae77f2fc7d0fe2e24de613bbe Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Fri, 16 Feb 2024 12:43:51 -0300 Subject: [PATCH 1/6] add core event to handle requests to start a snap sync --- core/core.go | 12 ++++++++++++ core/events.go | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/core/core.go b/core/core.go index cbe487ea4a..ca654020b2 100644 --- a/core/core.go +++ b/core/core.go @@ -74,6 +74,8 @@ type Core struct { syncTarget *types.Header // sync target header decided based on Best Prime Block as the target to sync to + snapSyncFeed event.Feed // Snap sync feed + normalListBackoff uint64 // normalListBackoff is the multiple on c_normalListProcCounter which delays the proc on normal list quit chan struct{} // core quit channel @@ -960,6 +962,16 @@ func (c *Core) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription return c.sl.hc.bc.SubscribeBlockProcessingEvent(ch) } +// SubscribeSnapSyncStartEvent returns a subscription to snap sync start events. +func (c *Core) SubscribeSnapSyncStartEvent(ch chan<- SnapSyncStartEvent) event.Subscription { + return c.snapSyncFeed.Subscribe(ch) +} + +// triggerSnapSyncStart sends a snap sync start event to all subscribers. +func (c *Core) triggerSnapSyncStart(blockNumber uint64) { + c.snapSyncFeed.Send(SnapSyncStartEvent{BlockNumber: blockNumber}) +} + // Export writes the active chain to the given writer. func (c *Core) Export(w io.Writer) error { return c.sl.hc.Export(w) diff --git a/core/events.go b/core/events.go index 2c3aa11ad2..1d0a0c14d2 100644 --- a/core/events.go +++ b/core/events.go @@ -42,3 +42,8 @@ type ChainSideEvent struct { } type ChainHeadEvent struct{ Block *types.Block } + +// SnapSyncStartEvent represents a request to start the snap sync process. +type SnapSyncStartEvent struct { + BlockNumber uint64 // The block number from which to start snap syncing +} From a9667941eec679667d2d02e2bf60250fd70f6290 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Fri, 16 Feb 2024 12:45:23 -0300 Subject: [PATCH 2/6] add functionality into 'trie' package to handle request/response of trie nodes --- trie/node.go | 8 -------- trie/snap.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) create mode 100644 trie/snap.go diff --git a/trie/node.go b/trie/node.go index b625d7b5bb..c9fe69298b 100644 --- a/trie/node.go +++ b/trie/node.go @@ -223,11 +223,3 @@ func wrapError(err error, ctx string) error { func (err *decodeError) Error() string { return fmt.Sprintf("%v (decode path: %s)", err.what, strings.Join(err.stack, "<-")) } - -// Used to send a trie node request to a peer -type TrieNodeRequest struct{} - -// Used to get a trie node response from a peer -type TrieNodeResponse struct { - NodeData []byte -} diff --git a/trie/snap.go b/trie/snap.go new file mode 100644 index 0000000000..cfdf3620d7 --- /dev/null +++ b/trie/snap.go @@ -0,0 +1,56 @@ +package trie + +import ( + "github.com/dominant-strategies/go-quai/common" +) + +// Used to send a trie node request to a peer +type TrieNodeRequest struct { + BlockHash common.Hash // Hash of the block to which the requested trie node belongs + NodeHash common.Hash // Hash of the trie node being requested +} + +// Used to get a trie node response from a peer +type TrieNodeResponse struct { + NodeData []byte + NodeHash common.Hash +} + +// GetTrieNode returns the trie node from a TrieNodeResponse +func (t *TrieNodeResponse) GetTrieNode() *TrieNode { + node, err := decodeNode(t.NodeHash[:], t.NodeData) + if err != nil { + panic(err) + } + return &TrieNode{n: node} +} + +// TrieNode is a public wrapper around a trie node that exposes +// methods for handling trie nodes. +type TrieNode struct { + n node +} + +// ChildHashes returns the hashes of the children of a fullNode trie node +func (t *TrieNode) ChildHashes() []common.Hash { + switch n := t.n.(type) { + case *fullNode: + hashes := make([]common.Hash, 0, 17) + for _, child := range n.Children { + if child != nil { + // child MUST be a hashNode + hn, _ := child.(hashNode) + hashes = append(hashes, common.BytesToHash(hn)) + } + } + return hashes + default: + return nil + } +} + +// IsFullNode returns true if the trie node is a fullNode +func (t *TrieNode) IsFullNode() bool { + _, ok := t.n.(*fullNode) + return ok +} From c1cb2fbacef57f2454f0ccd8a40b7837c3c6f172 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Tue, 20 Feb 2024 14:00:53 -0300 Subject: [PATCH 3/6] refactor SetSyncTarget to avoid header nil pointer --- core/core.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/core.go b/core/core.go index ca654020b2..37c0ece4ea 100644 --- a/core/core.go +++ b/core/core.go @@ -396,7 +396,7 @@ func (c *Core) addToQueueIfNotAppended(block *types.Block) { // SetSyncTarget sets the sync target entropy based on the prime blocks func (c *Core) SetSyncTarget(header *types.Header) { - if c.sl.subClients == nil || header.Hash() == c.sl.config.GenesisHash { + if c.sl.subClients == nil || header.Hash() == c.sl.config.GenesisHash || header == nil { return } @@ -410,10 +410,8 @@ func (c *Core) SetSyncTarget(header *types.Header) { nodeCtx := c.NodeLocation().Context() // Set Sync Target for subs if nodeCtx != common.ZONE_CTX { - if header != nil { - if c.sl.subClients[header.Location().SubIndex(nodeLocation)] != nil { - c.sl.subClients[header.Location().SubIndex(nodeLocation)].SetSyncTarget(context.Background(), header) - } + if subClient := c.sl.subClients[header.Location().SubIndex(nodeLocation)]; subClient != nil { + subClient.SetSyncTarget(context.Background(), header) } } if c.syncTarget == nil || c.syncTarget.ParentEntropy(nodeCtx).Cmp(header.ParentEntropy(nodeCtx)) < 0 { From e11525a7715d029ccfb1aeb6e09eded1d954c0a2 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Tue, 20 Feb 2024 18:18:13 -0300 Subject: [PATCH 4/6] implement TriggerSnapSync API --- core/core.go | 18 ++++++++++++++++++ internal/quaiapi/backend.go | 1 + internal/quaiapi/quai_api.go | 9 +++++++++ quai/api_backend.go | 4 ++++ quaiclient/quaiclient.go | 5 +++++ 5 files changed, 37 insertions(+) diff --git a/core/core.go b/core/core.go index 37c0ece4ea..438d56b96f 100644 --- a/core/core.go +++ b/core/core.go @@ -433,6 +433,17 @@ func (c *Core) SyncTargetEntropy() (*big.Int, *big.Int) { } } +// TriggerSnapSync triggers snap sync at the zone level +func (c *Core) TriggerSnapSync(header *types.Header) { + if nodeCtx := c.NodeLocation().Context(); nodeCtx != common.ZONE_CTX { + for _, subClient := range c.sl.subClients { + subClient.TriggerSnapSync(context.Background(), header) + } + } else { + c.triggerSnapSyncStart(header.Number(c.sl.NodeCtx()).Uint64()) + } +} + // addToAppendQueue adds a block to the append queue func (c *Core) addToAppendQueue(block *types.Block) error { nodeCtx := c.NodeLocation().Context() @@ -625,10 +636,17 @@ func (c *Core) WriteBlock(block *types.Block) { if nodeCtx == common.PRIME_CTX { if block != nil { c.SetSyncTarget(block.Header()) + if c.shouldStartSnapSync(block) { + c.TriggerSnapSync(block.Header()) + } } } } +func (c *Core) shouldStartSnapSync(block *types.Block) bool { + panic("TODO: implement") +} + func (c *Core) Append(header *types.Header, manifest types.BlockManifest, domPendingHeader *types.Header, domTerminus common.Hash, domOrigin bool, newInboundEtxs types.Transactions) (types.Transactions, bool, bool, error) { nodeCtx := c.NodeCtx() newPendingEtxs, subReorg, setHead, err := c.sl.Append(header, domPendingHeader, domTerminus, domOrigin, newInboundEtxs) diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index dfdb598a06..98938ff72e 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -92,6 +92,7 @@ type Backend interface { GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) SetSyncTarget(header *types.Header) + TriggerSnapSync(header *types.Header) ProcessingState() bool GetSlicesRunning() []common.Location diff --git a/internal/quaiapi/quai_api.go b/internal/quaiapi/quai_api.go index cf1bd511bc..a73a21da28 100644 --- a/internal/quaiapi/quai_api.go +++ b/internal/quaiapi/quai_api.go @@ -887,6 +887,15 @@ func (s *PublicBlockChainQuaiAPI) SetSyncTarget(ctx context.Context, raw json.Ra return nil } +func (s *PublicBlockChainQuaiAPI) TriggerSnapSync(ctx context.Context, raw json.RawMessage) error { + var header *types.Header + if err := json.Unmarshal(raw, &header); err != nil { + return err + } + s.b.TriggerSnapSync(header) + return nil +} + // ListRunningChains returns the running locations where the node is serving data. func (s *PublicBlockChainQuaiAPI) ListRunningChains() []common.Location { return s.b.GetSlicesRunning() diff --git a/quai/api_backend.go b/quai/api_backend.go index 712ae1db17..c2b10b1b78 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -549,6 +549,10 @@ func (b *QuaiAPIBackend) SetSyncTarget(header *types.Header) { b.quai.core.SetSyncTarget(header) } +func (b *QuaiAPIBackend) TriggerSnapSync(header *types.Header) { + b.quai.core.TriggerSnapSync(header) +} + func (b *QuaiAPIBackend) Logger() *log.Logger { return b.quai.logger } diff --git a/quaiclient/quaiclient.go b/quaiclient/quaiclient.go index da6fe0bc6a..07480f97b6 100644 --- a/quaiclient/quaiclient.go +++ b/quaiclient/quaiclient.go @@ -271,6 +271,11 @@ func (ec *Client) SetSyncTarget(ctx context.Context, header *types.Header) { ec.c.CallContext(ctx, nil, "quai_setSyncTarget", fields) } +func (ec *Client) TriggerSnapSync(ctx context.Context, header *types.Header) { + fields := header.RPCMarshalHeader() + ec.c.CallContext(ctx, nil, "quai_triggerSnapSync", fields) +} + //// Miner APIS // GetPendingHeader gets the latest pending header from the chain. From 03aaefdaa9b1ffc996459a4f9b17f650f784aba4 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Fri, 16 Feb 2024 12:46:28 -0300 Subject: [PATCH 5/6] define basic downloader functionality to handler snap sync requests --- quai/backend.go | 2 +- quai/downloader/downloader.go | 50 +++++++++ quai/downloader/fetcher.go | 185 ++++++++++++++++++++++++++++++++++ quai/downloader/interface.go | 11 ++ quai/handler.go | 53 ++++++++-- 5 files changed, 293 insertions(+), 8 deletions(-) create mode 100644 quai/downloader/downloader.go create mode 100644 quai/downloader/fetcher.go create mode 100644 quai/downloader/interface.go diff --git a/quai/backend.go b/quai/backend.go index c646326457..efa445a858 100644 --- a/quai/backend.go +++ b/quai/backend.go @@ -232,7 +232,7 @@ func New(stack *node.Node, p2p NetworkingAPI, config *quaiconfig.Config, nodeCtx quai.p2p.Subscribe(config.NodeLocation, common.Hash{}) quai.p2p.Subscribe(config.NodeLocation, &types.Transaction{}) - quai.handler = newHandler(quai.p2p, quai.core, config.NodeLocation) + quai.handler = newHandler(quai.p2p, quai.core, config.NodeLocation, chainDb) // Start the handler quai.handler.Start() diff --git a/quai/downloader/downloader.go b/quai/downloader/downloader.go new file mode 100644 index 0000000000..c92fd1adb0 --- /dev/null +++ b/quai/downloader/downloader.go @@ -0,0 +1,50 @@ +package downloader + +import ( + "sync" + + "bytes" + + "github.com/dominant-strategies/go-quai/common" + "github.com/dominant-strategies/go-quai/crypto" + "github.com/dominant-strategies/go-quai/ethdb" + "github.com/pkg/errors" +) + +type Downloader struct { + p2pNode P2PNode + f *fetcher +} + +func NewDownloader(p2pNode P2PNode, chainDb ethdb.Database, quitCh chan struct{}) *Downloader { + f := &fetcher{ + p2pNode: p2pNode, + db: chainDb, + mu: sync.Mutex{}, + quitCh: quitCh, + } + return &Downloader{ + p2pNode: p2pNode, + f: f, + } +} + +func (d *Downloader) StartSnapSync(loc common.Location, blockNumber uint64) error { + block, err := d.f.fetchBlock(loc, blockNumber) + if err != nil { + return errors.Errorf("failed to fetch block %d: %v", blockNumber, err) + } + + err = d.f.fetchStateTrie(block.Hash(), block.Header().EVMRoot()) + if err != nil { + return errors.Wrap(err, "failed to fetch state trie") + } + + return nil +} + +// VerifyNodeHash verifies a expected hash against the RLP encoding of the received trie node +func verifyNodeHash(rlpBytes []byte, expectedHash []byte) bool { + hash := crypto.Keccak256(rlpBytes) + return bytes.Equal(hash, expectedHash) +} diff --git a/quai/downloader/fetcher.go b/quai/downloader/fetcher.go new file mode 100644 index 0000000000..410d0060f1 --- /dev/null +++ b/quai/downloader/fetcher.go @@ -0,0 +1,185 @@ +package downloader + +import ( + "sync" + "time" + + "github.com/dominant-strategies/go-quai/common" + "github.com/dominant-strategies/go-quai/core/types" + "github.com/dominant-strategies/go-quai/ethdb" + "github.com/dominant-strategies/go-quai/trie" + "github.com/pkg/errors" +) + +const ( + // c_fetchTimeout is the timeout for fetching a structure from the network + c_fetchTimeout = 30 * time.Second + + // c_numTrieWorkers is the number of concurrent workers to fetch trie nodes + c_numTrieWorkers = 4 + + // c_fetchRetries is the number of times to retry fetching a trie node + c_fetchRetries = 3 +) + +type fetcher struct { + p2pNode P2PNode + queue chan common.Hash // queue for trie node hashes that need to be fetched + fetched map[common.Hash]bool // map to keep track of fetched or scheduled hashes to avoid duplicates + mu sync.Mutex // mutex to protect the fetched map + db ethdb.Database // local database to save the fetched trie nodes + quitCh chan struct{} // channel to signal the fetcher to stop +} + +// FetchBlock fetches a single block by its number. +func (d *fetcher) fetchBlock(loc common.Location, blockNumber uint64) (*types.Block, error) { + blockChan := d.p2pNode.Request(loc, blockNumber, types.Block{}) + select { + case block := <-blockChan: + return block.(*types.Block), nil + case <-time.After(c_fetchTimeout): + return nil, errors.Errorf("timeout fetching block %d", blockNumber) + case <-d.quitCh: + return nil, errors.New("fetcher stopped") + } +} + +// FetchBlockHeader fetches a single block header by its number. +func (d *fetcher) fetchBlockHeader(loc common.Location, blockNumber uint64) (*types.Header, error) { + headerChan := d.p2pNode.Request(loc, blockNumber, types.Header{}) + select { + case header := <-headerChan: + return header.(*types.Header), nil + case <-time.After(c_fetchTimeout): + return nil, errors.Errorf("timeout fetching block header %d", blockNumber) + case <-d.quitCh: + return nil, errors.New("fetcher stopped") + } +} + +// FetchStateTrie fetches the state trie of a block by its root hash. +func (f *fetcher) fetchStateTrie(blockHash, rootHash common.Hash) error { + // Initialize the fetched map + f.fetched = make(map[common.Hash]bool) + + // Initialize the queue + f.queue = make(chan common.Hash, 1000) + defer close(f.queue) + + // Start with the root hash + f.queue <- rootHash + // Start c_numTrieWorkers workers to fetch and process trie nodes + wg := sync.WaitGroup{} + for i := 0; i < c_numTrieWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case nodeHash := <-f.queue: + err := f.processNode(blockHash, nodeHash) + if err != nil { + panic("Implement") + } + case <-f.quitCh: + return + } + } + }() + } + + // Wait for all workers to finish + wg.Wait() + + return nil +} + +// FetchTrieNode sends a network request to fetch a trie node. +func (f *fetcher) fetchTrieNode(blockHash common.Hash, nodeHash common.Hash) (*trie.TrieNodeResponse, error) { + trieNodeReq := trie.TrieNodeRequest{ + NodeHash: nodeHash, + } + retries := 0 + for { + // Send the request to the network + trieChan := f.p2pNode.Request(common.Location{}, blockHash, trieNodeReq) + select { + case trieNode := <-trieChan: + trieNodeResp := trieNode.(*trie.TrieNodeResponse) + return trieNodeResp, nil + case <-time.After(c_fetchTimeout): + // Retry fetching the trie node + retries++ + if retries > c_fetchRetries { + return nil, errors.Errorf("timeout fetching trie node %x", nodeHash) + } + case <-f.quitCh: + return nil, errors.New("fetcher stopped") + } + } + +} + +// ProcessNode fetches a trie node from the network and processes it. +// It verifies the trie node's hash and saves it to local storage. +// If the trie node is a fullNode, it enqueues its children for processing. +func (f *fetcher) processNode(blockHash, nodeHash common.Hash) error { + // check if the node has already been fetched + if f.isFetched(nodeHash) { + return nil + } + + trieNodeResp, err := f.fetchTrieNode(blockHash, nodeHash) + if err != nil { + return err + } + + // Verify the trie node's hash + if !verifyNodeHash(trieNodeResp.NodeData, trieNodeResp.NodeHash[:]) { + // TODO: Handle invalid trie node hash. Report bad peer, etc. + panic("Implement") + } + + // save the trie node to local storage + err = f.commit(trieNodeResp) + if err != nil { + return err + } + + // Mark the node as fetched + f.addFetched(nodeHash) + + // Get the trie node from the response + trieNode := trieNodeResp.GetTrieNode() + + // If trieNode is a fullNode, enqueue its children for processing + if trieNode.IsFullNode() { + for _, childHash := range trieNode.ChildHashes() { + if f.isFetched(childHash) { + continue + } + f.queue <- childHash + } + } + + return nil +} + +// Commit saves the trie node to local storage. +func (f *fetcher) commit(trieNodeResp *trie.TrieNodeResponse) error { + return f.db.Put(trieNodeResp.NodeHash[:], trieNodeResp.NodeData) +} + +// IsFetched returns true if the trie node has already been fetched. +func (f *fetcher) isFetched(nodeHash common.Hash) bool { + f.mu.Lock() + defer f.mu.Unlock() + return f.fetched[nodeHash] +} + +// AddFetched marks the trie node as fetched. +func (f *fetcher) addFetched(nodeHash common.Hash) { + f.mu.Lock() + defer f.mu.Unlock() + f.fetched[nodeHash] = true +} diff --git a/quai/downloader/interface.go b/quai/downloader/interface.go new file mode 100644 index 0000000000..5d3fd2fe5b --- /dev/null +++ b/quai/downloader/interface.go @@ -0,0 +1,11 @@ +package downloader + +import ( + "github.com/dominant-strategies/go-quai/common" +) + +type P2PNode interface { + // Method to request data from the network + // Specify location, data hash, and data type to request + Request(common.Location, interface{}, interface{}) chan interface{} +} diff --git a/quai/handler.go b/quai/handler.go index 02824bd743..103b7a2679 100644 --- a/quai/handler.go +++ b/quai/handler.go @@ -1,14 +1,17 @@ package quai import ( + "math/big" + "sync" + "time" + "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core" "github.com/dominant-strategies/go-quai/core/types" + "github.com/dominant-strategies/go-quai/ethdb" "github.com/dominant-strategies/go-quai/event" "github.com/dominant-strategies/go-quai/log" - "math/big" - "sync" - "time" + "github.com/dominant-strategies/go-quai/quai/downloader" ) const ( @@ -29,16 +32,24 @@ type handler struct { missingBlockSub event.Subscription txsCh chan core.NewTxsEvent txsSub event.Subscription - wg sync.WaitGroup - quitCh chan struct{} + + wg sync.WaitGroup + quitCh chan struct{} + + snapSyncCh chan core.SnapSyncStartEvent + snapSyncSub event.Subscription + d *downloader.Downloader } -func newHandler(p2pBackend NetworkingAPI, core *core.Core, nodeLocation common.Location) *handler { +func newHandler(p2pBackend NetworkingAPI, core *core.Core, nodeLocation common.Location, chainDb ethdb.Database) *handler { + quitCh := make(chan struct{}) + d := downloader.NewDownloader(p2pBackend, chainDb, quitCh) handler := &handler{ nodeLocation: nodeLocation, p2pBackend: p2pBackend, core: core, - quitCh: make(chan struct{}), + quitCh: quitCh, + d: d, } return handler } @@ -49,6 +60,11 @@ func (h *handler) Start() { h.missingBlockSub = h.core.SubscribeMissingBlockEvent(h.missingBlockCh) go h.missingBlockLoop() + h.snapSyncCh = make(chan core.SnapSyncStartEvent) + h.snapSyncSub = h.core.SubscribeSnapSyncStartEvent(h.snapSyncCh) + h.wg.Add(1) + go h.snapSyncLoop() + nodeCtx := h.nodeLocation.Context() if nodeCtx == common.ZONE_CTX && h.core.ProcessingState() { h.wg.Add(1) @@ -148,3 +164,26 @@ func (h *handler) checkNextPrimeBlock() { } } } + +// snapSyncLoop runs every time a SnapSyncStartEvent is received +func (h *handler) snapSyncLoop() { + defer h.wg.Done() + for { + select { + case event := <-h.snapSyncCh: + h.startSnapSync(event.BlockNumber) + case <-h.snapSyncSub.Err(): + log.Global.Error("snapSyncSub error") + return + case <-h.quitCh: + return + } + } +} + +func (h *handler) startSnapSync(blockNumber uint64) { + err := h.d.StartSnapSync(h.nodeLocation, blockNumber) + if err != nil { + panic("TODO: handle error" + err.Error()) + } +} From d17f113f6b0c8a478d57326dda0a893ecd447937 Mon Sep 17 00:00:00 2001 From: Alejo Acosta Date: Wed, 28 Feb 2024 13:39:18 -0600 Subject: [PATCH 6/6] implement GetTrieNode on responder side --- p2p/node/api.go | 3 +-- p2p/protocol/handler.go | 35 ++++++++++++++++++++++------------- p2p/protocol/interface.go | 5 ++--- quai/interface.go | 3 +-- quai/p2p_backend.go | 17 ++++++++++++----- 5 files changed, 38 insertions(+), 25 deletions(-) diff --git a/p2p/node/api.go b/p2p/node/api.go index 307759f186..7b717a266d 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -14,7 +14,6 @@ import ( "github.com/dominant-strategies/go-quai/p2p" quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol" "github.com/dominant-strategies/go-quai/quai" - "github.com/dominant-strategies/go-quai/trie" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -252,7 +251,7 @@ func (p *P2PNode) GetHeader(hash common.Hash, location common.Location) *types.H panic("TODO: implement") } -func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse { +func (p *P2PNode) GetTrieNode(hash common.Hash, location common.Location) ([]byte, error) { return p.consensus.GetTrieNode(hash, location) } diff --git a/p2p/protocol/handler.go b/p2p/protocol/handler.go index 2a752bd4d3..b14d9e7036 100644 --- a/p2p/protocol/handler.go +++ b/p2p/protocol/handler.go @@ -1,7 +1,6 @@ package protocol import ( - "errors" "io" "math/big" @@ -12,6 +11,7 @@ import ( "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p/pb" "github.com/dominant-strategies/go-quai/trie" + "github.com/pkg/errors" ) // QuaiProtocolHandler handles all the incoming requests and responds with corresponding data @@ -69,19 +69,19 @@ func handleRequest(quaiMsg *pb.QuaiRequestMessage, stream network.Stream, node Q switch query.(type) { case *common.Hash: log.Global.WithFields(log.Fields{ - "requestID": id, + "requestID": id, "decodedType": decodedType, - "location": loc, - "hash": query, - "peer": stream.Conn().RemotePeer(), + "location": loc, + "hash": query, + "peer": stream.Conn().RemotePeer(), }).Debug("Received request by hash to handle") case *big.Int: log.Global.WithFields(log.Fields{ - "requestID": id, + "requestID": id, "decodedType": decodedType, - "location": loc, - "hash": query, - "peer": stream.Conn().RemotePeer(), + "location": loc, + "hash": query, + "peer": stream.Conn().RemotePeer(), }).Debug("Received request by number to handle") default: log.Global.Errorf("unsupported request input data field type: %T", query) @@ -228,13 +228,22 @@ func handleBlockNumberRequest(id uint32, loc common.Location, number *big.Int, s } func handleTrieNodeRequest(id uint32, loc common.Location, hash common.Hash, stream network.Stream, node QuaiP2PNode) error { - trieNode := node.GetTrieNode(hash, loc) + trieNode, err := node.GetTrieNode(hash, loc) + if err != nil { + return errors.Wrapf(err, "error getting trie node for hash %s", hash) + } + if trieNode == nil { - log.Global.Tracef("trie node not found") - return nil + return errors.Errorf("trie node not found for hash %s", hash) } + + trieNodeResp := &trie.TrieNodeResponse{ + NodeHash: hash, + NodeData: trieNode, + } + log.Global.Tracef("trie node found") - data, err := pb.EncodeQuaiResponse(id, loc, trieNode) + data, err := pb.EncodeQuaiResponse(id, loc, trieNodeResp) if err != nil { return err } diff --git a/p2p/protocol/interface.go b/p2p/protocol/interface.go index cacea99841..d8340ee043 100644 --- a/p2p/protocol/interface.go +++ b/p2p/protocol/interface.go @@ -4,13 +4,12 @@ import ( "math/big" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/p2p/requestManager" - "github.com/dominant-strategies/go-quai/trie" ) // interface required to join the quai protocol network @@ -21,7 +20,7 @@ type QuaiP2PNode interface { GetBlock(hash common.Hash, location common.Location) *types.Block GetHeader(hash common.Hash, location common.Location) *types.Header GetBlockHashByNumber(number *big.Int, location common.Location) *common.Hash - GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse + GetTrieNode(hash common.Hash, location common.Location) ([]byte, error) GetRequestManager() requestManager.RequestManager GetHostBackend() host.Host diff --git a/quai/interface.go b/quai/interface.go index 9e756a87ac..bdfcdd7ef0 100644 --- a/quai/interface.go +++ b/quai/interface.go @@ -6,7 +6,6 @@ import ( "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" - "github.com/dominant-strategies/go-quai/trie" "github.com/libp2p/go-libp2p/core" ) @@ -28,7 +27,7 @@ type ConsensusAPI interface { // Asks the consensus backend to lookup a trie node by hash and location, // and return the data in the trie node. - GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse + GetTrieNode(hash common.Hash, location common.Location) ([]byte, error) } // The networking backend will implement the following interface to enable consensus to communicate with other nodes. diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index 46e700ac37..d4ea03ae6b 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -10,7 +10,7 @@ import ( "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p" "github.com/dominant-strategies/go-quai/rpc" - "github.com/dominant-strategies/go-quai/trie" + "github.com/pkg/errors" ) // QuaiBackend implements the quai consensus protocol @@ -102,10 +102,17 @@ func (qbe *QuaiBackend) OnNewBroadcast(sourcePeer p2p.PeerID, data interface{}, return true } -// GetTrieNode returns the TrieNodeResponse for a given hash -func (qbe *QuaiBackend) GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse { - // Example/mock implementation - panic("todo") +// GetTrieNode returns the encoded RLP trie node for a given hash +func (qbe *QuaiBackend) GetTrieNode(hash common.Hash, location common.Location) ([]byte, error) { + backend := *qbe.GetBackend(location) + if backend == nil { + return nil, errors.Errorf("no backend found") + } + data, err := backend.ChainDb().Get(hash.Bytes()) + if err != nil { + return nil, err + } + return data, nil } // Returns the current block height for the given location