Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Stream manager #114

Merged
merged 10 commits into from
Feb 27, 2024
5 changes: 0 additions & 5 deletions common/stream_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ const (

// Reads the message from the stream and returns a byte of data.
func ReadMessageFromStream(stream network.Stream) ([]byte, error) {
// Set a read deadline
if err := stream.SetReadDeadline(time.Now().Add(C_STREAM_TIMEOUT)); err != nil {
alejoacosta74 marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.Wrap(err, "failed to set read deadline")
}

// First read the length of the incoming message
lenBytes := make([]byte, 4)
if _, err := io.ReadFull(stream, lenBytes); err != nil {
Expand Down
30 changes: 24 additions & 6 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
}
}
// Try to replace an existing transaction in the pending pool
from, _ := types.Sender(pool.signer, tx) // already validated
from, err := types.Sender(pool.signer, tx) // already validated
if err != nil {
return false, err
}
internal, err := from.InternalAddress()
if err != nil {
return false, err
Expand Down Expand Up @@ -876,7 +879,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
from, err := types.Sender(pool.signer, tx) // already validated
if err != nil {
return false, err
}
internal, err := from.InternalAddress()
if err != nil {
return false, err
Expand Down Expand Up @@ -1185,7 +1191,10 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
if tx == nil {
continue
}
from, _ := types.Sender(pool.signer, tx) // already validated
from, err := types.Sender(pool.signer, tx) // already validated
if err != nil {
continue
}
internal, err := from.InternalAddress()
if err != nil {
continue
Expand Down Expand Up @@ -1222,7 +1231,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
if tx == nil {
return
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
addr, err := types.Sender(pool.signer, tx) // already validated during insertion
if err != nil {
return
}
internal, err := addr.InternalAddress()
if err != nil {
return
Expand Down Expand Up @@ -1353,7 +1365,10 @@ func (pool *TxPool) scheduleReorgLoop() {
case tx := <-pool.queueTxEventCh:
// Queue up the event, but don't schedule a reorg. It's up to the caller to
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, tx)
addr, err := types.Sender(pool.signer, tx)
if err != nil {
continue
}
internal, err := addr.InternalAddress()
if err != nil {
pool.logger.WithField("err", err).Debug("Failed to queue transaction")
Expand Down Expand Up @@ -1444,7 +1459,10 @@ func (pool *TxPool) runReorg(done chan struct{}, cancel chan struct{}, reset *tx

// Notify subsystems for newly added transactions
for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx)
addr, err := types.Sender(pool.signer, tx)
if err != nil {
continue
}
internal, err := addr.InternalAddress()
if err != nil {
pool.logger.WithField("err", err).Debug("Failed to add transaction event")
Expand Down
5 changes: 4 additions & 1 deletion core/types/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,10 @@ func (r Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, num
// The contract address can be derived from the transaction itself
if r[i].ContractAddress.Equal(common.Address{}) && txs[i].To() == nil {
// Deriving the signer is expensive, only do if it's actually needed
from, _ := Sender(signer, txs[i])
from, err := Sender(signer, txs[i])
if err != nil {
return err
}
r[i].ContractAddress = crypto.CreateAddress(from, txs[i].Nonce(), txs[i].Data(), config.Location)
}
// The used gas can be calculated based on previous r
Expand Down
3 changes: 3 additions & 0 deletions core/types/stxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type SpentTxOut struct {
// countSpentOutputs returns the number of utxos the passed block spends.
func CountSpentOutputs(block *Block) int {
transactions := block.QiTransactions()
if len(transactions) == 0 {
return 0
}
if IsCoinBaseTx(transactions[0]) {
transactions = transactions[1:]
}
Expand Down
14 changes: 4 additions & 10 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,6 @@ func (tx *Transaction) ProtoDecode(protoTx *ProtoTransaction, location common.Lo
tx.SetInner(&itx)

case 1:
if protoTx.Nonce == nil {
return errors.New("missing required field 'Nonce' in ProtoTransaction")
}
if protoTx.Gas == nil {
return errors.New("missing required field 'Gas' in ProtoTransaction")
}
Expand All @@ -278,12 +275,6 @@ func (tx *Transaction) ProtoDecode(protoTx *ProtoTransaction, location common.Lo
if protoTx.To == nil {
return errors.New("missing required field 'To' in ProtoTransaction")
}
if protoTx.GasFeeCap == nil {
return errors.New("missing required field 'GasFeeCap' in ProtoTransaction")
}
if protoTx.GasTipCap == nil {
return errors.New("missing required field 'GasTipCap' in ProtoTransaction")
}

var etx ExternalTx
etx.AccessList = AccessList{}
Expand Down Expand Up @@ -950,7 +941,10 @@ func NewTransactionsByPriceAndNonce(signer Signer, etxs []*Transaction, txs map[
}

for from, accTxs := range txs {
acc, _ := Sender(signer, accTxs[0])
acc, err := Sender(signer, accTxs[0])
if err != nil {
continue
}
wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee, nil)
// Remove transaction if sender doesn't match from, or if wrapping fails.
if acc.Bytes20() != from || err != nil {
Expand Down
6 changes: 6 additions & 0 deletions core/types/transaction_signing.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func Sender(signer Signer, tx *Transaction) (common.Address, error) {
if tx.Type() == ExternalTxType { // External TX does not have a signature
return tx.inner.(*ExternalTx).Sender, nil
}
if tx.Type() == QiTxType {
return common.Zero, errors.New("sender field not available for qi type")
}
if sc := tx.from.Load(); sc != nil {
sigCache := sc.(sigCache)
// If the signer used to derive from in a previous
Expand Down Expand Up @@ -169,6 +172,9 @@ func (s SignerV1) Sender(tx *Transaction) (common.Address, error) {
if tx.Type() == ExternalTxType { // External TX does not have a signature
return tx.inner.(*ExternalTx).Sender, nil
}
if tx.Type() == QiTxType {
return common.Zero, errors.New("cannot find the sender for a qi transaction type")
}
V, R, S := tx.GetEcdsaSignatureValues()
// DynamicFee txs are defined to use 0 and 1 as their recovery
// id, add 27 to become equivalent to unprotected signatures.
Expand Down
5 changes: 4 additions & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,10 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
// during transaction acceptance is the transaction pool.
//
// We use the signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)
from, err := types.Sender(env.signer, tx)
if err != nil {
continue
}
// Start executing the transaction
env.state.Prepare(tx.Hash(), env.tcount)

Expand Down
5 changes: 2 additions & 3 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/dominant-strategies/go-quai/common"
)
Expand Down Expand Up @@ -230,8 +229,8 @@ func (p *P2PNode) GetBootPeers() []peer.AddrInfo {
}

// Opens a new stream to the given peer using the given protocol ID
func (p *P2PNode) NewStream(peerID peer.ID, protocolID protocol.ID) (network.Stream, error) {
return p.Host.NewStream(p.ctx, peerID, protocolID)
func (p *P2PNode) NewStream(peerID peer.ID) (network.Stream, error) {
return p.peerManager.GetStream(peerID)
}

// Connects to the given peer
Expand Down
18 changes: 10 additions & 8 deletions p2p/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,9 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
return nil, err
}

peerID, err := peer.IDFromPublicKey(getNodeKey().GetPublic())
if err != nil {
log.Global.Fatalf("error getting self ID: %s", err)
return nil, err
}
// Peer manager handles both connection management and connection gating
peerMgr, err := peerManager.NewManager(
ctx,
peerID,
viper.GetInt(utils.MaxPeersFlag.Name), // LowWater
2*viper.GetInt(utils.MaxPeersFlag.Name), // HighWater
nil,
Expand Down Expand Up @@ -183,6 +177,9 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
nodeID := host.ID()
log.Global.Infof("node created: %s", nodeID)

// Set peer manager's self ID
peerMgr.SetSelfID(nodeID)

// Create a gossipsub instance with helper functions
ps, err := pubsubManager.NewGossipSubManager(ctx, host)

Expand All @@ -196,7 +193,7 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
"headers": createCache(c_defaultCacheSize),
}

return &P2PNode{
p2p := &P2PNode{
ctx: ctx,
Host: host,
bootpeers: bootpeers,
Expand All @@ -205,7 +202,12 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
peerManager: peerMgr,
requestManager: requestManager.NewManager(),
cache: cache,
}, nil
}

// Set the peer manager's backend to the host
peerMgr.SetP2PBackend(p2p)

return p2p, nil
}

func createCache(size int) *lru.Cache[common.Hash, interface{}] {
Expand Down
44 changes: 12 additions & 32 deletions p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"errors"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p/pb"
"github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/p2p/requestManager"
"github.com/dominant-strategies/go-quai/trie"
)

Expand All @@ -24,12 +24,14 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
"data": data,
"datatype": datatype,
}).Trace("Requesting the data from peer")
stream, err := p.NewStream(peerID, protocol.ProtocolVersion)
stream, err := p.NewStream(peerID)
if err != nil {
// TODO: should we report this peer for failure to participate?
log.Global.WithFields(log.Fields{
"peerId": peerID,
"error": err,
}).Error("Failed to open stream to peer")
return nil, err
}
defer stream.Close()

// Get a new request ID
id := p.requestManager.CreateRequest()
Expand All @@ -40,9 +42,6 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
return nil, err
}

// Start listening for the response
go p.readLoop(stream, location)

// Send the request to the peer
err = common.WriteMessageToStream(stream, requestBytes)
if err != nil {
Expand Down Expand Up @@ -90,31 +89,12 @@ func (p *P2PNode) requestFromPeer(peerID peer.ID, location common.Location, data
return nil, errors.New("invalid response")
}

func (p *P2PNode) readLoop(stream network.Stream, location common.Location) {
for {
message, err := common.ReadMessageFromStream(stream)
if err != nil {
return
}

recvdID, recvdType, err := pb.DecodeQuaiResponse(message, location)
if err != nil {
log.Global.WithField(
"err", err,
).Errorf("error decoding quai response: %s", err)
return
}
func (p *P2PNode) GetRequestManager() requestManager.RequestManager {
return p.requestManager
}

dataChan, err := p.requestManager.GetRequestChan(recvdID)
if err != nil {
log.Global.WithFields(log.Fields{
"requestID": recvdID,
"err": err,
}).Error("error associating request ID with data channel")
return
}
dataChan <- recvdType
}
func (p *P2PNode) GetHostBackend() host.Host {
return p.Host
}

// Creates a Cid from a location to be used as DHT key
Expand Down
Loading
Loading