From 7bed18fd0674dac9d9d55e6e1c747eb5b70cd145 Mon Sep 17 00:00:00 2001 From: gop Date: Fri, 23 Feb 2024 16:42:06 -0600 Subject: [PATCH 01/10] bugfix: to not crash the test --- core/types/stxo.go | 3 +++ core/types/transaction.go | 9 --------- quai/gasprice/gasprice.go | 1 + quai/p2p_backend.go | 3 +++ rpc/handler.go | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/types/stxo.go b/core/types/stxo.go index 9a57ad9cfa..b899653522 100644 --- a/core/types/stxo.go +++ b/core/types/stxo.go @@ -24,6 +24,9 @@ type SpentTxOut struct { // countSpentOutputs returns the number of utxos the passed block spends. func CountSpentOutputs(block *Block) int { transactions := block.QiTransactions() + if len(transactions) == 0 { + return 0 + } if IsCoinBaseTx(transactions[0]) { transactions = transactions[1:] } diff --git a/core/types/transaction.go b/core/types/transaction.go index 540e2e38f4..4c5173a3af 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -260,9 +260,6 @@ func (tx *Transaction) ProtoDecode(protoTx *ProtoTransaction, location common.Lo tx.SetInner(&itx) case 1: - if protoTx.Nonce == nil { - return errors.New("missing required field 'Nonce' in ProtoTransaction") - } if protoTx.Gas == nil { return errors.New("missing required field 'Gas' in ProtoTransaction") } @@ -278,12 +275,6 @@ func (tx *Transaction) ProtoDecode(protoTx *ProtoTransaction, location common.Lo if protoTx.To == nil { return errors.New("missing required field 'To' in ProtoTransaction") } - if protoTx.GasFeeCap == nil { - return errors.New("missing required field 'GasFeeCap' in ProtoTransaction") - } - if protoTx.GasTipCap == nil { - return errors.New("missing required field 'GasTipCap' in ProtoTransaction") - } var etx ExternalTx etx.AccessList = AccessList{} diff --git a/quai/gasprice/gasprice.go b/quai/gasprice/gasprice.go index f849153b24..6ffd79a526 100644 --- a/quai/gasprice/gasprice.go +++ b/quai/gasprice/gasprice.go @@ -137,6 +137,7 @@ func NewOracle(backend OracleBackend, params Config, logger *log.Logger) *Oracle // necessary to add the basefee to the returned number to fall back to the legacy // behavior. func (oracle *Oracle) SuggestTipCap(ctx context.Context) (*big.Int, error) { + return big.NewInt(0), nil nodeCtx := oracle.backend.ChainConfig().Location.Context() if nodeCtx != common.ZONE_CTX { return nil, errors.New("suggestTipCap can only be called in zone chains") diff --git a/quai/p2p_backend.go b/quai/p2p_backend.go index 2ca3417ef9..46e700ac37 100644 --- a/quai/p2p_backend.go +++ b/quai/p2p_backend.go @@ -115,6 +115,9 @@ func (qbe *QuaiBackend) GetHeight(location common.Location) uint64 { } func (qbe *QuaiBackend) LookupBlock(hash common.Hash, location common.Location) *types.Block { + if qbe == nil { + return nil + } backend := *qbe.GetBackend(location) if backend == nil { log.Global.Error("no backend found") diff --git a/rpc/handler.go b/rpc/handler.go index 5bf0cc5da0..5ee81b33dd 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -301,7 +301,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess if resp.Error.Data != nil { ctx = append(ctx, "errdata", resp.Error.Data) } - log.Global.Warn("Served " + msg.Method) + log.Global.Warn(ctx, "Served "+msg.Method) } else { log.Global.Debug("Served " + msg.Method) } From a56cdd608417573438ffa700d86e387cf56c2600 Mon Sep 17 00:00:00 2001 From: gop Date: Mon, 26 Feb 2024 13:27:44 -0600 Subject: [PATCH 02/10] bugfix: cannot get the signer for the Qi type transaction and catching the error --- core/tx_pool.go | 30 ++++++++++++++++++++++++------ core/types/receipt.go | 5 ++++- core/types/transaction.go | 5 ++++- core/types/transaction_signing.go | 6 ++++++ core/worker.go | 5 ++++- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index ea1f0969a1..a3e2e968eb 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -815,7 +815,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } } // Try to replace an existing transaction in the pending pool - from, _ := types.Sender(pool.signer, tx) // already validated + from, err := types.Sender(pool.signer, tx) // already validated + if err != nil { + return false, err + } internal, err := from.InternalAddress() if err != nil { return false, err @@ -876,7 +879,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // Note, this method assumes the pool lock is held! func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) { // Try to insert the transaction into the future queue - from, _ := types.Sender(pool.signer, tx) // already validated + from, err := types.Sender(pool.signer, tx) // already validated + if err != nil { + return false, err + } internal, err := from.InternalAddress() if err != nil { return false, err @@ -1185,7 +1191,10 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { if tx == nil { continue } - from, _ := types.Sender(pool.signer, tx) // already validated + from, err := types.Sender(pool.signer, tx) // already validated + if err != nil { + continue + } internal, err := from.InternalAddress() if err != nil { continue @@ -1222,7 +1231,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { if tx == nil { return } - addr, _ := types.Sender(pool.signer, tx) // already validated during insertion + addr, err := types.Sender(pool.signer, tx) // already validated during insertion + if err != nil { + return + } internal, err := addr.InternalAddress() if err != nil { return @@ -1353,7 +1365,10 @@ func (pool *TxPool) scheduleReorgLoop() { case tx := <-pool.queueTxEventCh: // Queue up the event, but don't schedule a reorg. It's up to the caller to // request one later if they want the events sent. - addr, _ := types.Sender(pool.signer, tx) + addr, err := types.Sender(pool.signer, tx) + if err != nil { + continue + } internal, err := addr.InternalAddress() if err != nil { pool.logger.WithField("err", err).Debug("Failed to queue transaction") @@ -1444,7 +1459,10 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx // Notify subsystems for newly added transactions for _, tx := range promoted { - addr, _ := types.Sender(pool.signer, tx) + addr, err := types.Sender(pool.signer, tx) + if err != nil { + continue + } internal, err := addr.InternalAddress() if err != nil { pool.logger.WithField("err", err).Debug("Failed to add transaction event") diff --git a/core/types/receipt.go b/core/types/receipt.go index f263192d64..9eb94815ae 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -386,7 +386,10 @@ func (r Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, num // The contract address can be derived from the transaction itself if r[i].ContractAddress.Equal(common.Address{}) && txs[i].To() == nil { // Deriving the signer is expensive, only do if it's actually needed - from, _ := Sender(signer, txs[i]) + from, err := Sender(signer, txs[i]) + if err != nil { + return err + } r[i].ContractAddress = crypto.CreateAddress(from, txs[i].Nonce(), txs[i].Data(), config.Location) } // The used gas can be calculated based on previous r diff --git a/core/types/transaction.go b/core/types/transaction.go index 4c5173a3af..d88dbf21e6 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -941,7 +941,10 @@ func NewTransactionsByPriceAndNonce(signer Signer, etxs []*Transaction, txs map[ } for from, accTxs := range txs { - acc, _ := Sender(signer, accTxs[0]) + acc, err := Sender(signer, accTxs[0]) + if err != nil { + continue + } wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee, nil) // Remove transaction if sender doesn't match from, or if wrapping fails. if acc.Bytes20() != from || err != nil { diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index e6e96322c7..db026ffe7a 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -106,6 +106,9 @@ func Sender(signer Signer, tx *Transaction) (common.Address, error) { if tx.Type() == ExternalTxType { // External TX does not have a signature return tx.inner.(*ExternalTx).Sender, nil } + if tx.Type() == QiTxType { + return common.Zero, errors.New("sender field not available for qi type") + } if sc := tx.from.Load(); sc != nil { sigCache := sc.(sigCache) // If the signer used to derive from in a previous @@ -169,6 +172,9 @@ func (s SignerV1) Sender(tx *Transaction) (common.Address, error) { if tx.Type() == ExternalTxType { // External TX does not have a signature return tx.inner.(*ExternalTx).Sender, nil } + if tx.Type() == QiTxType { + return common.Zero, errors.New("cannot find the sender for a qi transaction type") + } V, R, S := tx.GetEcdsaSignatureValues() // DynamicFee txs are defined to use 0 and 1 as their recovery // id, add 27 to become equivalent to unprotected signatures. diff --git a/core/worker.go b/core/worker.go index ee3d67198e..533b4588f9 100644 --- a/core/worker.go +++ b/core/worker.go @@ -784,7 +784,10 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP // during transaction acceptance is the transaction pool. // // We use the signer regardless of the current hf. - from, _ := types.Sender(env.signer, tx) + from, err := types.Sender(env.signer, tx) + if err != nil { + continue + } // Start executing the transaction env.state.Prepare(tx.Hash(), env.tcount) From e42fad643b5a4dc557eb317dee6822a5ed1586a7 Mon Sep 17 00:00:00 2001 From: Hussam Date: Wed, 7 Feb 2024 11:30:59 -0600 Subject: [PATCH 03/10] Optimize redundant peerID creation --- p2p/node/node.go | 9 +++------ p2p/peerManager/peerManager.go | 7 +++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/p2p/node/node.go b/p2p/node/node.go index bb9830886e..b1faf7e341 100644 --- a/p2p/node/node.go +++ b/p2p/node/node.go @@ -79,15 +79,9 @@ func NewNode(ctx context.Context) (*P2PNode, error) { return nil, err } - peerID, err := peer.IDFromPublicKey(getNodeKey().GetPublic()) - if err != nil { - log.Global.Fatalf("error getting self ID: %s", err) - return nil, err - } // Peer manager handles both connection management and connection gating peerMgr, err := peerManager.NewManager( ctx, - peerID, viper.GetInt(utils.MaxPeersFlag.Name), // LowWater 2*viper.GetInt(utils.MaxPeersFlag.Name), // HighWater nil, @@ -183,6 +177,9 @@ func NewNode(ctx context.Context) (*P2PNode, error) { nodeID := host.ID() log.Global.Infof("node created: %s", nodeID) + // Set peer manager's self ID + peerMgr.SetSelfID(nodeID) + // Create a gossipsub instance with helper functions ps, err := pubsubManager.NewGossipSubManager(ctx, host) diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index 3404d6bee9..e25913117d 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -94,7 +94,7 @@ type BasicPeerManager struct { ctx context.Context } -func NewManager(ctx context.Context, selfID p2p.PeerID, low int, high int, datastore datastore.Datastore) (*BasicPeerManager, error) { +func NewManager(ctx context.Context, low int, high int, datastore datastore.Datastore) (*BasicPeerManager, error) { mgr, err := basicConnMgr.NewConnManager(low, high) if err != nil { return nil, err @@ -121,7 +121,6 @@ func NewManager(ctx context.Context, selfID p2p.PeerID, low int, high int, datas } return &BasicPeerManager{ - selfID: selfID, BasicConnMgr: mgr, BasicConnectionGater: gater, bestPeersDB: bestPeersDB, @@ -150,6 +149,10 @@ func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error { return nil } +func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) { + pm.selfID = selfID +} + func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int) []p2p.PeerID { peerSubset := make([]p2p.PeerID, 0, numPeers) q := query.Query{ From 73cdd9c575cbd89d0015160b10570ab0925fd6f3 Mon Sep 17 00:00:00 2001 From: Hussam Date: Wed, 7 Feb 2024 11:39:38 -0600 Subject: [PATCH 04/10] Modify interface to remove redundant protocol restatements --- p2p/node/p2p_services.go | 2 +- p2p/protocol/interface.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index f1037670ec..821ab449b4 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -24,7 +24,7 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data "data": data, "datatype": datatype, }).Trace("Requesting the data from peer") - stream, err := p.NewStream(peerID, protocol.ProtocolVersion) + stream, err := p.NewStream(peerID) if err != nil { // TODO: should we report this peer for failure to participate? return nil, err diff --git a/p2p/protocol/interface.go b/p2p/protocol/interface.go index 5165aae71c..88d6a2c042 100644 --- a/p2p/protocol/interface.go +++ b/p2p/protocol/interface.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/protocol" "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" @@ -17,7 +16,7 @@ import ( type QuaiP2PNode interface { GetBootPeers() []peer.AddrInfo Connect(pi peer.AddrInfo) error - NewStream(peerID peer.ID, protocolID protocol.ID) (network.Stream, error) + NewStream(peerID peer.ID) (network.Stream, error) Network() network.Network // Search for a block in the node's cache, or query the consensus backend if it's not found in cache. // Returns nil if the block is not found. From e0fa40601f88f633e605c82d45b5a6e634bbf258 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 11:02:47 -0600 Subject: [PATCH 05/10] Create new QuaiMessage type envelope for request/response --- p2p/pb/quai_messages.pb.go | 232 ++++++++++++++++++++++++++++--------- p2p/pb/quai_messages.proto | 17 ++- 2 files changed, 189 insertions(+), 60 deletions(-) diff --git a/p2p/pb/quai_messages.pb.go b/p2p/pb/quai_messages.pb.go index f80a3baecf..39a03c620b 100644 --- a/p2p/pb/quai_messages.pb.go +++ b/p2p/pb/quai_messages.pb.go @@ -306,7 +306,8 @@ type QuaiResponseMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Location *common.ProtoLocation `protobuf:"bytes,2,opt,name=location,proto3" json:"location,omitempty"` // Types that are assignable to Response: // // *QuaiResponseMessage_Block @@ -356,6 +357,13 @@ func (x *QuaiResponseMessage) GetId() uint32 { return 0 } +func (x *QuaiResponseMessage) GetLocation() *common.ProtoLocation { + if x != nil { + return x.Location + } + return nil +} + func (m *QuaiResponseMessage) GetResponse() isQuaiResponseMessage_Response { if m != nil { return m.Response @@ -403,23 +411,23 @@ type isQuaiResponseMessage_Response interface { } type QuaiResponseMessage_Block struct { - Block *types.ProtoBlock `protobuf:"bytes,2,opt,name=block,proto3,oneof"` + Block *types.ProtoBlock `protobuf:"bytes,3,opt,name=block,proto3,oneof"` } type QuaiResponseMessage_Header struct { - Header *types.ProtoHeader `protobuf:"bytes,3,opt,name=header,proto3,oneof"` + Header *types.ProtoHeader `protobuf:"bytes,4,opt,name=header,proto3,oneof"` } type QuaiResponseMessage_Transaction struct { - Transaction *types.ProtoTransaction `protobuf:"bytes,4,opt,name=transaction,proto3,oneof"` + Transaction *types.ProtoTransaction `protobuf:"bytes,5,opt,name=transaction,proto3,oneof"` } type QuaiResponseMessage_BlockHash struct { - BlockHash *common.ProtoHash `protobuf:"bytes,5,opt,name=blockHash,proto3,oneof"` + BlockHash *common.ProtoHash `protobuf:"bytes,6,opt,name=blockHash,proto3,oneof"` } type QuaiResponseMessage_TrieNode struct { - TrieNode *trie.ProtoTrieNode `protobuf:"bytes,6,opt,name=trieNode,proto3,oneof"` + TrieNode *trie.ProtoTrieNode `protobuf:"bytes,7,opt,name=trieNode,proto3,oneof"` } func (*QuaiResponseMessage_Block) isQuaiResponseMessage_Response() {} @@ -432,6 +440,87 @@ func (*QuaiResponseMessage_BlockHash) isQuaiResponseMessage_Response() {} func (*QuaiResponseMessage_TrieNode) isQuaiResponseMessage_Response() {} +type QuaiMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Payload: + // + // *QuaiMessage_Request + // *QuaiMessage_Response + Payload isQuaiMessage_Payload `protobuf_oneof:"payload"` +} + +func (x *QuaiMessage) Reset() { + *x = QuaiMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_p2p_pb_quai_messages_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QuaiMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QuaiMessage) ProtoMessage() {} + +func (x *QuaiMessage) ProtoReflect() protoreflect.Message { + mi := &file_p2p_pb_quai_messages_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QuaiMessage.ProtoReflect.Descriptor instead. +func (*QuaiMessage) Descriptor() ([]byte, []int) { + return file_p2p_pb_quai_messages_proto_rawDescGZIP(), []int{4} +} + +func (m *QuaiMessage) GetPayload() isQuaiMessage_Payload { + if m != nil { + return m.Payload + } + return nil +} + +func (x *QuaiMessage) GetRequest() *QuaiRequestMessage { + if x, ok := x.GetPayload().(*QuaiMessage_Request); ok { + return x.Request + } + return nil +} + +func (x *QuaiMessage) GetResponse() *QuaiResponseMessage { + if x, ok := x.GetPayload().(*QuaiMessage_Response); ok { + return x.Response + } + return nil +} + +type isQuaiMessage_Payload interface { + isQuaiMessage_Payload() +} + +type QuaiMessage_Request struct { + Request *QuaiRequestMessage `protobuf:"bytes,1,opt,name=request,proto3,oneof"` +} + +type QuaiMessage_Response struct { + Response *QuaiResponseMessage `protobuf:"bytes,2,opt,name=response,proto3,oneof"` +} + +func (*QuaiMessage_Request) isQuaiMessage_Payload() {} + +func (*QuaiMessage_Response) isQuaiMessage_Payload() {} + var File_p2p_pb_quai_messages_proto protoreflect.FileDescriptor var file_p2p_pb_quai_messages_proto_rawDesc = []byte{ @@ -478,29 +567,42 @@ var file_p2p_pb_quai_messages_proto_rawDesc = []byte{ 0x2e, 0x74, 0x72, 0x69, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x54, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x48, 0x01, 0x52, 0x08, 0x74, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x22, 0xad, 0x02, 0x0a, 0x13, 0x51, 0x75, 0x61, 0x69, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x73, 0x74, 0x22, 0xe0, 0x02, 0x0a, 0x13, 0x51, 0x75, 0x61, 0x69, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x62, 0x6c, - 0x6f, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x6c, 0x6f, 0x63, - 0x6b, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x00, 0x52, 0x05, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x2c, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x48, 0x00, 0x52, 0x06, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x12, 0x3b, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, - 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x48, 0x00, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x31, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x18, 0x05, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x48, 0x61, 0x73, 0x68, 0x48, 0x00, 0x52, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, - 0x61, 0x73, 0x68, 0x12, 0x31, 0x0a, 0x08, 0x74, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x18, - 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x74, 0x72, 0x69, 0x65, 0x2e, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x54, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x48, 0x00, 0x52, 0x08, 0x74, 0x72, - 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x42, 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x42, 0x2f, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x64, 0x6f, 0x6d, 0x69, 0x6e, 0x61, 0x6e, 0x74, 0x2d, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, - 0x67, 0x69, 0x65, 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x71, 0x75, 0x61, 0x69, 0x2f, 0x70, 0x32, 0x70, - 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6c, 0x6f, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, + 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x4c, 0x6f, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x29, 0x0a, + 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, + 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x48, + 0x00, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x2c, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, + 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x48, 0x00, 0x52, 0x06, + 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x3b, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x6c, + 0x6f, 0x63, 0x6b, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x61, 0x73, 0x68, 0x48, 0x00, 0x52, 0x09, 0x62, 0x6c, 0x6f, + 0x63, 0x6b, 0x48, 0x61, 0x73, 0x68, 0x12, 0x31, 0x0a, 0x08, 0x74, 0x72, 0x69, 0x65, 0x4e, 0x6f, + 0x64, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x74, 0x72, 0x69, 0x65, 0x2e, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x54, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x48, 0x00, 0x52, + 0x08, 0x74, 0x72, 0x69, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x42, 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x97, 0x01, 0x0a, 0x0b, 0x51, 0x75, 0x61, 0x69, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x71, 0x75, 0x61, 0x69, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x51, 0x75, 0x61, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x71, 0x75, 0x61, 0x69, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x2e, 0x51, 0x75, 0x61, 0x69, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, + 0x2f, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x6f, + 0x6d, 0x69, 0x6e, 0x61, 0x6e, 0x74, 0x2d, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x69, 0x65, + 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x71, 0x75, 0x61, 0x69, 0x2f, 0x70, 0x32, 0x70, 0x2f, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -515,39 +617,43 @@ func file_p2p_pb_quai_messages_proto_rawDescGZIP() []byte { return file_p2p_pb_quai_messages_proto_rawDescData } -var file_p2p_pb_quai_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_p2p_pb_quai_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_p2p_pb_quai_messages_proto_goTypes = []interface{}{ (*GossipBlock)(nil), // 0: quaiprotocol.GossipBlock (*GossipTransaction)(nil), // 1: quaiprotocol.GossipTransaction (*QuaiRequestMessage)(nil), // 2: quaiprotocol.QuaiRequestMessage (*QuaiResponseMessage)(nil), // 3: quaiprotocol.QuaiResponseMessage - (*types.ProtoBlock)(nil), // 4: block.ProtoBlock - (*types.ProtoTransaction)(nil), // 5: block.ProtoTransaction - (*common.ProtoLocation)(nil), // 6: common.ProtoLocation - (*common.ProtoHash)(nil), // 7: common.ProtoHash - (*types.ProtoHeader)(nil), // 8: block.ProtoHeader - (*trie.ProtoTrieNode)(nil), // 9: trie.ProtoTrieNode + (*QuaiMessage)(nil), // 4: quaiprotocol.QuaiMessage + (*types.ProtoBlock)(nil), // 5: block.ProtoBlock + (*types.ProtoTransaction)(nil), // 6: block.ProtoTransaction + (*common.ProtoLocation)(nil), // 7: common.ProtoLocation + (*common.ProtoHash)(nil), // 8: common.ProtoHash + (*types.ProtoHeader)(nil), // 9: block.ProtoHeader + (*trie.ProtoTrieNode)(nil), // 10: trie.ProtoTrieNode } var file_p2p_pb_quai_messages_proto_depIdxs = []int32{ - 4, // 0: quaiprotocol.GossipBlock.block:type_name -> block.ProtoBlock - 5, // 1: quaiprotocol.GossipTransaction.transaction:type_name -> block.ProtoTransaction - 6, // 2: quaiprotocol.QuaiRequestMessage.location:type_name -> common.ProtoLocation - 7, // 3: quaiprotocol.QuaiRequestMessage.hash:type_name -> common.ProtoHash - 4, // 4: quaiprotocol.QuaiRequestMessage.block:type_name -> block.ProtoBlock - 8, // 5: quaiprotocol.QuaiRequestMessage.header:type_name -> block.ProtoHeader - 5, // 6: quaiprotocol.QuaiRequestMessage.transaction:type_name -> block.ProtoTransaction - 7, // 7: quaiprotocol.QuaiRequestMessage.blockHash:type_name -> common.ProtoHash - 9, // 8: quaiprotocol.QuaiRequestMessage.trieNode:type_name -> trie.ProtoTrieNode - 4, // 9: quaiprotocol.QuaiResponseMessage.block:type_name -> block.ProtoBlock - 8, // 10: quaiprotocol.QuaiResponseMessage.header:type_name -> block.ProtoHeader - 5, // 11: quaiprotocol.QuaiResponseMessage.transaction:type_name -> block.ProtoTransaction - 7, // 12: quaiprotocol.QuaiResponseMessage.blockHash:type_name -> common.ProtoHash - 9, // 13: quaiprotocol.QuaiResponseMessage.trieNode:type_name -> trie.ProtoTrieNode - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 5, // 0: quaiprotocol.GossipBlock.block:type_name -> block.ProtoBlock + 6, // 1: quaiprotocol.GossipTransaction.transaction:type_name -> block.ProtoTransaction + 7, // 2: quaiprotocol.QuaiRequestMessage.location:type_name -> common.ProtoLocation + 8, // 3: quaiprotocol.QuaiRequestMessage.hash:type_name -> common.ProtoHash + 5, // 4: quaiprotocol.QuaiRequestMessage.block:type_name -> block.ProtoBlock + 9, // 5: quaiprotocol.QuaiRequestMessage.header:type_name -> block.ProtoHeader + 6, // 6: quaiprotocol.QuaiRequestMessage.transaction:type_name -> block.ProtoTransaction + 8, // 7: quaiprotocol.QuaiRequestMessage.blockHash:type_name -> common.ProtoHash + 10, // 8: quaiprotocol.QuaiRequestMessage.trieNode:type_name -> trie.ProtoTrieNode + 7, // 9: quaiprotocol.QuaiResponseMessage.location:type_name -> common.ProtoLocation + 5, // 10: quaiprotocol.QuaiResponseMessage.block:type_name -> block.ProtoBlock + 9, // 11: quaiprotocol.QuaiResponseMessage.header:type_name -> block.ProtoHeader + 6, // 12: quaiprotocol.QuaiResponseMessage.transaction:type_name -> block.ProtoTransaction + 8, // 13: quaiprotocol.QuaiResponseMessage.blockHash:type_name -> common.ProtoHash + 10, // 14: quaiprotocol.QuaiResponseMessage.trieNode:type_name -> trie.ProtoTrieNode + 2, // 15: quaiprotocol.QuaiMessage.request:type_name -> quaiprotocol.QuaiRequestMessage + 3, // 16: quaiprotocol.QuaiMessage.response:type_name -> quaiprotocol.QuaiResponseMessage + 17, // [17:17] is the sub-list for method output_type + 17, // [17:17] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_p2p_pb_quai_messages_proto_init() } @@ -604,6 +710,18 @@ func file_p2p_pb_quai_messages_proto_init() { return nil } } + file_p2p_pb_quai_messages_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*QuaiMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_p2p_pb_quai_messages_proto_msgTypes[2].OneofWrappers = []interface{}{ (*QuaiRequestMessage_Hash)(nil), @@ -621,13 +739,17 @@ func file_p2p_pb_quai_messages_proto_init() { (*QuaiResponseMessage_BlockHash)(nil), (*QuaiResponseMessage_TrieNode)(nil), } + file_p2p_pb_quai_messages_proto_msgTypes[4].OneofWrappers = []interface{}{ + (*QuaiMessage_Request)(nil), + (*QuaiMessage_Response)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_p2p_pb_quai_messages_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/p2p/pb/quai_messages.proto b/p2p/pb/quai_messages.proto index b58b5450cb..17bc622ce8 100644 --- a/p2p/pb/quai_messages.proto +++ b/p2p/pb/quai_messages.proto @@ -32,12 +32,19 @@ message QuaiRequestMessage { // QuaiResponseMessage is the main 'envelope' for QuaiProtocol response messages message QuaiResponseMessage { uint32 id = 1; + common.ProtoLocation location = 2; oneof response { - block.ProtoBlock block = 2; - block.ProtoHeader header = 3; - block.ProtoTransaction transaction = 4; - common.ProtoHash blockHash = 5; - trie.ProtoTrieNode trieNode = 6; + block.ProtoBlock block = 3; + block.ProtoHeader header = 4; + block.ProtoTransaction transaction = 5; + common.ProtoHash blockHash = 6; + trie.ProtoTrieNode trieNode = 7; } } +message QuaiMessage { + oneof payload { + QuaiRequestMessage request = 1; + QuaiResponseMessage response = 2; + } +} \ No newline at end of file From 0e4eeb589da3f512ed6799b00990c2dc0a9565b6 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 11:12:33 -0600 Subject: [PATCH 06/10] Integrate QuaiMessage in request/response handling --- p2p/node/p2p_services.go | 27 ------- p2p/pb/proto_services.go | 50 +++++++----- p2p/protocol/handler.go | 171 +++++++++++++++++++++++++-------------- 3 files changed, 142 insertions(+), 106 deletions(-) diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index 821ab449b4..810a418db2 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -90,33 +90,6 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data return nil, errors.New("invalid response") } -func (p *P2PNode) readLoop(stream network.Stream, location common.Location) { - for { - message, err := common.ReadMessageFromStream(stream) - if err != nil { - return - } - - recvdID, recvdType, err := pb.DecodeQuaiResponse(message, location) - if err != nil { - log.Global.WithField( - "err", err, - ).Errorf("error decoding quai response: %s", err) - return - } - - dataChan, err := p.requestManager.GetRequestChan(recvdID) - if err != nil { - log.Global.WithFields(log.Fields{ - "requestID": recvdID, - "err": err, - }).Error("error associating request ID with data channel") - return - } - dataChan <- recvdType - } -} - // Creates a Cid from a location to be used as DHT key func locationToCid(location common.Location) cid.Cid { sliceBytes := []byte(location.Name()) diff --git a/p2p/pb/proto_services.go b/p2p/pb/proto_services.go index adbc1a2901..67eeb25861 100644 --- a/p2p/pb/proto_services.go +++ b/p2p/pb/proto_services.go @@ -12,6 +12,14 @@ import ( "github.com/dominant-strategies/go-quai/trie" ) +func DecodeQuaiMessage(data []byte) (*QuaiMessage, error) { + msg := &QuaiMessage{} // Assuming QuaiMessage is the struct generated by protoc + if err := proto.Unmarshal(data, msg); err != nil { + return nil, err // Return nil and the error if unmarshalling fails + } + return msg, nil // Return the decoded message and nil error if successful +} + // EncodeRequestMessage creates a marshaled protobuf message for a Quai Request. // Returns the serialized protobuf message. func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, datatype interface{}) ([]byte, error) { @@ -44,7 +52,10 @@ func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, da return nil, errors.Errorf("unsupported request data type: %T", datatype) } - return proto.Marshal(&reqMsg) + quaiMsg := QuaiMessage{ + Payload: &QuaiMessage_Request{Request: &reqMsg}, + } + return proto.Marshal(&quaiMsg) } // DecodeRequestMessage unmarshals a protobuf message into a Quai Request. @@ -54,13 +65,7 @@ func EncodeQuaiRequest(id uint32, location common.Location, data interface{}, da // 3. The location // 4. The request data // 5. An error -func DecodeQuaiRequest(data []byte) (uint32, interface{}, common.Location, interface{}, error) { - var reqMsg QuaiRequestMessage - err := proto.Unmarshal(data, &reqMsg) - if err != nil { - return 0, nil, common.Location{}, nil, err - } - +func DecodeQuaiRequest(reqMsg *QuaiRequestMessage) (uint32, interface{}, common.Location, interface{}, error) { location := &common.Location{} location.ProtoDecode(reqMsg.Location) @@ -99,10 +104,11 @@ func DecodeQuaiRequest(data []byte) (uint32, interface{}, common.Location, inter // EncodeResponse creates a marshaled protobuf message for a Quai Response. // Returns the serialized protobuf message. -func EncodeQuaiResponse(id uint32, data interface{}) ([]byte, error) { +func EncodeQuaiResponse(id uint32, location common.Location, data interface{}) ([]byte, error) { respMsg := QuaiResponseMessage{ - Id: id, + Id: id, + Location: location.ProtoEncode(), } switch data := data.(type) { @@ -112,29 +118,37 @@ func EncodeQuaiResponse(id uint32, data interface{}) ([]byte, error) { return nil, err } respMsg.Response = &QuaiResponseMessage_Block{Block: protoBlock} + case *types.Header: protoHeader, err := data.ProtoEncode() if err != nil { return nil, err } respMsg.Response = &QuaiResponseMessage_Header{Header: protoHeader} + case *types.Transaction: protoTransaction, err := data.ProtoEncode() if err != nil { return nil, err } respMsg.Response = &QuaiResponseMessage_Transaction{Transaction: protoTransaction} + case *trie.TrieNodeResponse: protoTrieNode := &trie.ProtoTrieNode{ProtoNodeData: data.NodeData} respMsg.Response = &QuaiResponseMessage_TrieNode{TrieNode: protoTrieNode} case *common.Hash: respMsg.Response = &QuaiResponseMessage_BlockHash{BlockHash: data.ProtoEncode()} + default: return nil, errors.Errorf("unsupported response data type: %T", data) } - return proto.Marshal(&respMsg) + quaiMsg := QuaiMessage{ + Payload: &QuaiMessage_Response{Response: &respMsg}, + } + + return proto.Marshal(&quaiMsg) } // Unmarshals a serialized protobuf message into a Quai Response message. @@ -142,14 +156,10 @@ func EncodeQuaiResponse(id uint32, data interface{}) ([]byte, error) { // 1. The request ID // 2. The decoded type (i.e. *types.Header, *types.Block, etc) // 3. An error -func DecodeQuaiResponse(data []byte, sourceLocation common.Location) (uint32, interface{}, error) { - var respMsg QuaiResponseMessage - err := proto.Unmarshal(data, &respMsg) - if err != nil { - return 0, nil, err - } - +func DecodeQuaiResponse(respMsg *QuaiResponseMessage) (uint32, interface{}, error) { id := respMsg.Id + sourceLocation := &common.Location{} + sourceLocation.ProtoDecode(respMsg.Location) switch respMsg.Response.(type) { case *QuaiResponseMessage_Block: @@ -158,7 +168,7 @@ func DecodeQuaiResponse(data []byte, sourceLocation common.Location) (uint32, in if protoBlock.Header.Location == nil { return id, nil, errors.New("location is nil") } - err := block.ProtoDecode(protoBlock, protoBlock.Header.GetLocation().GetValue()) + err := block.ProtoDecode(protoBlock, *sourceLocation) if err != nil { return id, nil, err } @@ -174,7 +184,7 @@ func DecodeQuaiResponse(data []byte, sourceLocation common.Location) (uint32, in case *QuaiResponseMessage_Transaction: protoTransaction := respMsg.GetTransaction() transaction := &types.Transaction{} - err := transaction.ProtoDecode(protoTransaction, sourceLocation) + err := transaction.ProtoDecode(protoTransaction, *sourceLocation) if err != nil { return id, nil, err } diff --git a/p2p/protocol/handler.go b/p2p/protocol/handler.go index ca56769f07..2a752bd4d3 100644 --- a/p2p/protocol/handler.go +++ b/p2p/protocol/handler.go @@ -4,7 +4,6 @@ import ( "errors" "io" "math/big" - "os" "github.com/libp2p/go-libp2p/core/network" @@ -32,7 +31,7 @@ func QuaiProtocolHandler(stream network.Stream, node QuaiP2PNode) { for { data, err := common.ReadMessageFromStream(stream) if err != nil { - if errors.Is(err, network.ErrReset) || errors.Is(err, io.EOF) || errors.Is(err, os.ErrDeadlineExceeded) { + if errors.Is(err, network.ErrReset) || errors.Is(err, io.EOF) { return } @@ -40,67 +39,118 @@ func QuaiProtocolHandler(stream network.Stream, node QuaiP2PNode) { // TODO: handle error continue } - id, decodedType, loc, query, err := pb.DecodeQuaiRequest(data) + + quaiMsg, err := pb.DecodeQuaiMessage(data) if err != nil { - log.Global.Errorf("error decoding quai request: %s", err) - // TODO: handle error + log.Global.Errorf("error decoding quai message: %s", err) continue } - switch query.(type) { - case *common.Hash: - log.Global.Debugf("Received request id: %d for %T, location %v hash %s from peer %s", id, decodedType, loc, query, stream.Conn().RemotePeer()) - case *big.Int: - log.Global.Debugf("Received request id: %d for %T, location %v number %s from peer %s", id, decodedType, loc, query, stream.Conn().RemotePeer()) + + switch { + case quaiMsg.GetRequest() != nil: + handleRequest(quaiMsg.GetRequest(), stream, node) + + case quaiMsg.GetResponse() != nil: + handleResponse(quaiMsg.GetResponse(), stream, node) + default: - log.Global.Errorf("unsupported request input data field type: %T", query) + log.Global.WithField("quaiMsg", quaiMsg).Errorf("unsupported quai message type") } + } +} - switch decodedType.(type) { - case *types.Block: - requestedHash := query.(*common.Hash) - err = handleBlockRequest(id, loc, *requestedHash, stream, node) - if err != nil { - log.Global.Errorf("error handling block request: %s", err) - // TODO: handle error - continue - } - case *types.Header: - requestedHash := query.(*common.Hash) - err = handleHeaderRequest(id, loc, *requestedHash, stream, node) - if err != nil { - log.Global.Errorf("error handling header request: %s", err) - // TODO: handle error - continue - } - case *types.Transaction: - requestedHash := query.(*common.Hash) - err = handleTransactionRequest(id, loc, *requestedHash, stream, node) - if err != nil { - log.Global.Errorf("error handling transaction request: %s", err) - // TODO: handle error - continue - } - case *common.Hash: - number := query.(*big.Int) - err = handleBlockNumberRequest(id, loc, number, stream, node) - if err != nil { - log.Global.Errorf("error handling block number request: %s", err) - continue - } - case trie.TrieNodeRequest: - requestedHash := query.(*common.Hash) - err := handleTrieNodeRequest(id, loc, *requestedHash, stream, node) - if err != nil { - log.Global.Errorf("error handling trie node request: %s", err) - } - default: - log.Global.Errorf("unsupported request data type: %T", decodedType) - // TODO: handle error - continue +func handleRequest(quaiMsg *pb.QuaiRequestMessage, stream network.Stream, node QuaiP2PNode) { + id, decodedType, loc, query, err := pb.DecodeQuaiRequest(quaiMsg) + if err != nil { + log.Global.WithField("err", err).Errorf("error decoding quai request") + // TODO: handle error + return + } + switch query.(type) { + case *common.Hash: + log.Global.WithFields(log.Fields{ + "requestID": id, + "decodedType": decodedType, + "location": loc, + "hash": query, + "peer": stream.Conn().RemotePeer(), + }).Debug("Received request by hash to handle") + case *big.Int: + log.Global.WithFields(log.Fields{ + "requestID": id, + "decodedType": decodedType, + "location": loc, + "hash": query, + "peer": stream.Conn().RemotePeer(), + }).Debug("Received request by number to handle") + default: + log.Global.Errorf("unsupported request input data field type: %T", query) + } + switch decodedType.(type) { + case *types.Block: + requestedHash := query.(*common.Hash) + err = handleBlockRequest(id, loc, *requestedHash, stream, node) + if err != nil { + log.Global.WithField("err", err).Error("error handling block request") + // TODO: handle error + return } + case *types.Header: + requestedHash := query.(*common.Hash) + err = handleHeaderRequest(id, loc, *requestedHash, stream, node) + if err != nil { + log.Global.WithField("err", err).Error("error handling header request") + // TODO: handle error + return + } + case *types.Transaction: + requestedHash := query.(*common.Hash) + err = handleTransactionRequest(id, loc, *requestedHash, stream, node) + if err != nil { + log.Global.WithField("err", err).Error("error handling transaction request") + // TODO: handle error + return + } + case *common.Hash: + number := query.(*big.Int) + err = handleBlockNumberRequest(id, loc, number, stream, node) + if err != nil { + log.Global.WithField("err", err).Error("error handling block number request") + return + } + case trie.TrieNodeRequest: + requestedHash := query.(*common.Hash) + err := handleTrieNodeRequest(id, loc, *requestedHash, stream, node) + if err != nil { + log.Global.WithField("err", err).Error("error handling trie node request") + } + default: + log.Global.WithField("request type", decodedType).Error("unsupported request data type") + // TODO: handle error + return + + } +} + +func handleResponse(quaiResp *pb.QuaiResponseMessage, stream network.Stream, node QuaiP2PNode) { + recvdID, recvdType, err := pb.DecodeQuaiResponse(quaiResp) + if err != nil { + log.Global.WithField( + "err", err, + ).Errorf("error decoding quai response: %s", err) + return + } + + dataChan, err := node.GetRequestManager().GetRequestChan(recvdID) + if err != nil { + log.Global.WithFields(log.Fields{ + "requestID": recvdID, + "err": err, + }).Error("error associating request ID with data channel") + return } - log.Global.Tracef("Exiting Quai Protocol Handler") + dataChan <- recvdType } // Seeks the block in the cache or database and sends it to the peer in a pb.QuaiResponseMessage @@ -113,7 +163,7 @@ func handleBlockRequest(id uint32, loc common.Location, hash common.Hash, stream } log.Global.Debugf("block found %s", block.Hash()) // create a Quai Message Response with the block - data, err := pb.EncodeQuaiResponse(id, block) + data, err := pb.EncodeQuaiResponse(id, loc, block) if err != nil { return err } @@ -121,7 +171,10 @@ func handleBlockRequest(id uint32, loc common.Location, hash common.Hash, stream if err != nil { return err } - log.Global.Debugf("Sent block %s to peer %s", block.Hash(), stream.Conn().RemotePeer()) + log.Global.WithFields(log.Fields{ + "blockHash": block.Hash(), + "peer": stream.Conn().RemotePeer(), + }).Trace("Sent block to peer") return nil } @@ -135,7 +188,7 @@ func handleHeaderRequest(id uint32, loc common.Location, hash common.Hash, strea } log.Global.Debugf("header found %s", header.Hash()) // create a Quai Message Response with the header - data, err := pb.EncodeQuaiResponse(id, header) + data, err := pb.EncodeQuaiResponse(id, loc, header) if err != nil { return err } @@ -161,7 +214,7 @@ func handleBlockNumberRequest(id uint32, loc common.Location, number *big.Int, s } log.Global.Tracef("block found %s", blockHash) // create a Quai Message Response with the block - data, err := pb.EncodeQuaiResponse(id, blockHash) + data, err := pb.EncodeQuaiResponse(id, loc, blockHash) if err != nil { return err } @@ -181,7 +234,7 @@ func handleTrieNodeRequest(id uint32, loc common.Location, hash common.Hash, str return nil } log.Global.Tracef("trie node found") - data, err := pb.EncodeQuaiResponse(id, trieNode) + data, err := pb.EncodeQuaiResponse(id, loc, trieNode) if err != nil { return err } From 83276a45ed14eca65bb1252f84507a5d5087f630 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 11:20:53 -0600 Subject: [PATCH 07/10] Remove read deadline due to stream reuse --- common/stream_services.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/common/stream_services.go b/common/stream_services.go index b670c8bae5..51a0f3b74e 100644 --- a/common/stream_services.go +++ b/common/stream_services.go @@ -17,11 +17,6 @@ const ( // Reads the message from the stream and returns a byte of data. func ReadMessageFromStream(stream network.Stream) ([]byte, error) { - // Set a read deadline - if err := stream.SetReadDeadline(time.Now().Add(C_STREAM_TIMEOUT)); err != nil { - return nil, errors.Wrap(err, "failed to set read deadline") - } - // First read the length of the incoming message lenBytes := make([]byte, 4) if _, err := io.ReadFull(stream, lenBytes); err != nil { From d33b8185d68f6357577a76e624ede91bff538235 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 11:23:10 -0600 Subject: [PATCH 08/10] Make the p2p host functionality available to the peer manager --- p2p/node/node.go | 9 +++++++-- p2p/peerManager/peerManager.go | 6 ++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/p2p/node/node.go b/p2p/node/node.go index b1faf7e341..85fb1a3cbd 100644 --- a/p2p/node/node.go +++ b/p2p/node/node.go @@ -193,7 +193,7 @@ func NewNode(ctx context.Context) (*P2PNode, error) { "headers": createCache(c_defaultCacheSize), } - return &P2PNode{ + p2p := &P2PNode{ ctx: ctx, Host: host, bootpeers: bootpeers, @@ -202,7 +202,12 @@ func NewNode(ctx context.Context) (*P2PNode, error) { peerManager: peerMgr, requestManager: requestManager.NewManager(), cache: cache, - }, nil + } + + // Set the peer manager's backend to the host + peerMgr.SetP2PBackend(p2p) + + return p2p, nil } func createCache(size int) *lru.Cache[common.Hash, interface{}] { diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index e25913117d..368aae14b7 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -85,6 +85,8 @@ type BasicPeerManager struct { *basicConnGater.BasicConnectionGater *basicConnMgr.BasicConnMgr + p2pBackend quaiprotocol.QuaiP2PNode + selfID p2p.PeerID bestPeersDB *peerdb.PeerDB @@ -148,6 +150,10 @@ func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error { return nil } +func (pm *BasicPeerManager) SetP2PBackend(host quaiprotocol.QuaiP2PNode) { + pm.p2pBackend = host +} + func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) { pm.selfID = selfID From 92bb37ad93a457e33894fc20e1359f78103dfe14 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 12:09:19 -0600 Subject: [PATCH 09/10] Implement getrequestmanager --- p2p/node/p2p_services.go | 21 ++++++++++++++------- p2p/protocol/interface.go | 3 +++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/p2p/node/p2p_services.go b/p2p/node/p2p_services.go index 810a418db2..ed443604bd 100644 --- a/p2p/node/p2p_services.go +++ b/p2p/node/p2p_services.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multihash" @@ -12,7 +12,7 @@ import ( "github.com/dominant-strategies/go-quai/core/types" "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p/pb" - "github.com/dominant-strategies/go-quai/p2p/protocol" + "github.com/dominant-strategies/go-quai/p2p/requestManager" "github.com/dominant-strategies/go-quai/trie" ) @@ -26,10 +26,12 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data }).Trace("Requesting the data from peer") stream, err := p.NewStream(peerID) if err != nil { - // TODO: should we report this peer for failure to participate? + log.Global.WithFields(log.Fields{ + "peerId": peerID, + "error": err, + }).Error("Failed to open stream to peer") return nil, err } - defer stream.Close() // Get a new request ID id := p.requestManager.CreateRequest() @@ -40,9 +42,6 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data return nil, err } - // Start listening for the response - go p.readLoop(stream, location) - // Send the request to the peer err = common.WriteMessageToStream(stream, requestBytes) if err != nil { @@ -90,6 +89,14 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data return nil, errors.New("invalid response") } +func (p *P2PNode) GetRequestManager() requestManager.RequestManager { + return p.requestManager +} + +func (p *P2PNode) GetHostBackend() host.Host { + return p.Host +} + // Creates a Cid from a location to be used as DHT key func locationToCid(location common.Location) cid.Cid { sliceBytes := []byte(location.Name()) diff --git a/p2p/protocol/interface.go b/p2p/protocol/interface.go index 88d6a2c042..2156417076 100644 --- a/p2p/protocol/interface.go +++ b/p2p/protocol/interface.go @@ -24,4 +24,7 @@ type QuaiP2PNode interface { GetHeader(hash common.Hash, location common.Location) *types.Header GetBlockHashByNumber(number *big.Int, location common.Location) *common.Hash GetTrieNode(hash common.Hash, location common.Location) *trie.TrieNodeResponse + GetRequestManager() requestManager.RequestManager + GetHostBackend() host.Host + } From 57c32e75e96ed2144f5d6c7e93ba7376dfa72bf8 Mon Sep 17 00:00:00 2001 From: Hussam Date: Tue, 27 Feb 2024 12:10:12 -0600 Subject: [PATCH 10/10] Implement stream cache, fix the peer pruning process --- p2p/node/api.go | 5 +-- p2p/peerManager/peerManager.go | 80 +++++++++++++++++++++++++++++++--- p2p/protocol/interface.go | 9 ++-- 3 files changed, 80 insertions(+), 14 deletions(-) diff --git a/p2p/node/api.go b/p2p/node/api.go index de25193382..307759f186 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" "github.com/dominant-strategies/go-quai/common" ) @@ -230,8 +229,8 @@ func (p *P2PNode) GetBootPeers() []peer.AddrInfo { } // Opens a new stream to the given peer using the given protocol ID -func (p *P2PNode) NewStream(peerID peer.ID, protocolID protocol.ID) (network.Stream, error) { - return p.Host.NewStream(p.ctx, peerID, protocolID) +func (p *P2PNode) NewStream(peerID peer.ID) (network.Stream, error) { + return p.peerManager.GetStream(peerID) } // Connects to the given peer diff --git a/p2p/peerManager/peerManager.go b/p2p/peerManager/peerManager.go index 368aae14b7..fc1bb804ff 100644 --- a/p2p/peerManager/peerManager.go +++ b/p2p/peerManager/peerManager.go @@ -6,17 +6,22 @@ import ( "strings" "sync" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" + "github.com/dominant-strategies/go-quai/log" "github.com/dominant-strategies/go-quai/p2p" + quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" - basicConnGater "github.com/libp2p/go-libp2p/p2p/net/conngater" - basicConnMgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/dominant-strategies/go-quai/p2p/peerManager/peerdb" "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + basicConnGater "github.com/libp2p/go-libp2p/p2p/net/conngater" + basicConnMgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" ) const ( @@ -26,6 +31,9 @@ const ( // The number of peers to return when querying for peers c_peerCount = 3 + // The amount of redundancy for open streams + // c_peerCount * c_streamReplicationFactor = total number of open streams + c_streamReplicationFactor = 3 // Dir names for the peerDBs c_bestDBName = "bestPeersDB" @@ -33,6 +41,10 @@ const ( c_lastResortDBName = "lastResortPeersDB" ) +var ( + errStreamNotFound = errors.New("stream not found") +) + // PeerManager is an interface that extends libp2p Connection Manager and Gater type PeerManager interface { connmgr.ConnManager @@ -52,8 +64,10 @@ type PeerManager interface { AddPeer(p2p.PeerID) error // Removes a peer from all the quality buckets RemovePeer(p2p.PeerID) error + // Returns an existing stream with that peer or opens a new one + GetStream(p peer.ID) (network.Stream, error) - // Returns c_recipientCount of the highest quality peers: lively & resposnive + // Returns c_recipientCount of the highest quality peers: lively & responsive GetBestPeersWithFallback() []p2p.PeerID // Returns c_recipientCount responsive, but less lively peers GetResponsivePeersWithFallback() []p2p.PeerID @@ -86,6 +100,7 @@ type BasicPeerManager struct { *basicConnMgr.BasicConnMgr p2pBackend quaiprotocol.QuaiP2PNode + streamCache *lru.Cache selfID p2p.PeerID @@ -122,13 +137,22 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data return nil, err } + lruCache, err := lru.NewWithEvict( + c_peerCount*c_streamReplicationFactor, + severStream, + ) + if err != nil { + return nil, err + } + return &BasicPeerManager{ + ctx: ctx, + streamCache: lruCache, BasicConnMgr: mgr, BasicConnectionGater: gater, bestPeersDB: bestPeersDB, responsivePeersDB: responsivePeersDB, lastResortPeers: lastResortPeers, - ctx: ctx, }, nil } @@ -136,8 +160,16 @@ func (pm *BasicPeerManager) AddPeer(peerID p2p.PeerID) error { return pm.recategorizePeer(peerID) } -// Removes peer from the bucket it is in. Does not return an error if the peer is not found func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error { + err := pm.removePeerFromDBs(peerID) + if err != nil { + return err + } + return pm.prunePeerConnection(peerID) +} + +// Removes peer from the bucket it is in. Does not return an error if the peer is not found +func (pm *BasicPeerManager) removePeerFromDBs(peerID p2p.PeerID) error { key := datastore.NewKey(peerID.String()) dbs := []*peerdb.PeerDB{pm.bestPeersDB, pm.responsivePeersDB, pm.lastResortPeers} @@ -147,13 +179,47 @@ func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error { return db.Delete(pm.ctx, key) } } - return nil } + +func (pm *BasicPeerManager) prunePeerConnection(peerID p2p.PeerID) error { + stream, ok := pm.streamCache.Get(peerID) + if ok { + log.Global.WithField("peerID", peerID).Debug("Pruned connection with peer") + severStream(peerID, stream) + return nil + } + return errStreamNotFound +} + +func severStream(key interface{}, value interface{}) { + stream := value.(network.Stream) + stream.Close() +} + func (pm *BasicPeerManager) SetP2PBackend(host quaiprotocol.QuaiP2PNode) { pm.p2pBackend = host } +func (pm *BasicPeerManager) GetStream(peerID p2p.PeerID) (network.Stream, error) { + stream, ok := pm.streamCache.Get(peerID) + var err error + if !ok { + // Create a new stream to the peer and register it in the cache + stream, err = pm.p2pBackend.GetHostBackend().NewStream(pm.ctx, peerID, quaiprotocol.ProtocolVersion) + if err != nil { + // Explicitly return nil here to avoid casting a nil later + return nil, err + } + pm.streamCache.Add(peerID, stream) + go quaiprotocol.QuaiProtocolHandler(stream.(network.Stream), pm.p2pBackend) + log.Global.Debug("Had to create new stream") + } else { + log.Global.Debug("Requested stream was found in cache") + } + + return stream.(network.Stream), err +} func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) { pm.selfID = selfID @@ -267,7 +333,7 @@ func (pm *BasicPeerManager) recategorizePeer(peer p2p.PeerID) error { responsiveness := pm.calculatePeerResponsiveness(peer) // remove peer from DB first - err := pm.RemovePeer(peer) + err := pm.removePeerFromDBs(peer) if err != nil { return err } diff --git a/p2p/protocol/interface.go b/p2p/protocol/interface.go index 2156417076..cacea99841 100644 --- a/p2p/protocol/interface.go +++ b/p2p/protocol/interface.go @@ -3,21 +3,19 @@ package protocol import ( "math/big" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/network" "github.com/dominant-strategies/go-quai/common" "github.com/dominant-strategies/go-quai/core/types" + "github.com/dominant-strategies/go-quai/p2p/requestManager" "github.com/dominant-strategies/go-quai/trie" ) // interface required to join the quai protocol network type QuaiP2PNode interface { GetBootPeers() []peer.AddrInfo - Connect(pi peer.AddrInfo) error - NewStream(peerID peer.ID) (network.Stream, error) - Network() network.Network // Search for a block in the node's cache, or query the consensus backend if it's not found in cache. // Returns nil if the block is not found. GetBlock(hash common.Hash, location common.Location) *types.Block @@ -27,4 +25,7 @@ type QuaiP2PNode interface { GetRequestManager() requestManager.RequestManager GetHostBackend() host.Host + Connect(peer.AddrInfo) error + NewStream(peer.ID) (network.Stream, error) + Network() network.Network }