Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TX reject messages not being counted anymore #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 190 additions & 23 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package neutrino

import (
"fmt"
"github.com/davecgh/go-spew/spew"
"sync"
"sync/atomic"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/gcash/bchd/blockchain"
"github.com/gcash/bchd/chaincfg/chainhash"
"github.com/gcash/bchd/wire"
Expand All @@ -33,6 +33,17 @@ var (
// to a peer that previously failed because of a timeout.
QueryPeerCooldown = time.Second * 5

// QueryRejectTimeout is the time we'll wait after sending a response to
// an INV query for a potential reject answer. If we don't get a reject
// before this delay, we assume the TX was accepted.
QueryRejectTimeout = time.Second

// QueryInvalidTxThreshold is the threshold for the fraction of peers
// that need to respond to a TX with a code of pushtx.Invalid to count
// it as invalid, even if not all peers respond. This currently
// corresponds to 60% of peers that need to reject.
QueryInvalidTxThreshold float32 = 0.6

// QueryNumRetries specifies how many times to retry sending a query to
// each peer before we've concluded we aren't going to get a valid
// response. This allows to make up for missed messages in some
Expand Down Expand Up @@ -81,6 +92,12 @@ type queryOptions struct {
// on a query in case we don't have any peers.
peerConnectTimeout time.Duration

// rejectTimeout is the time we'll wait after sending a response to an
// INV query for a potential reject answer. If we don't get a reject
// before this delay, we assume the TX was accepted. This option is only
// used when publishing a transaction.
rejectTimeout time.Duration

// encoding lets the query know which encoding to use when queueing
// messages to a peer.
encoding wire.MessageEncoding
Expand All @@ -95,6 +112,12 @@ type queryOptions struct {
// they're doing something like a key import.
persistToDisk bool

// invalidTxThreshold is the threshold for the fraction of peers
// that need to respond to a TX with a code of pushtx.Invalid to count
// it as invalid, even if not all peers respond. This option is only
// used when publishing a transaction.
invalidTxThreshold float32

// optimisticBatch indicates whether we expect more calls to follow,
// and that we should attempt to batch more items with the query such
// that they can be cached, avoiding the extra round trip.
Expand Down Expand Up @@ -131,7 +154,9 @@ func defaultQueryOptions() *queryOptions {
timeout: QueryTimeout,
numRetries: uint8(QueryNumRetries),
peerConnectTimeout: QueryPeerConnectTimeout,
rejectTimeout: QueryRejectTimeout,
encoding: QueryEncoding,
invalidTxThreshold: QueryInvalidTxThreshold,
optimisticBatch: noBatch,
}
}
Expand Down Expand Up @@ -159,6 +184,17 @@ func NumRetries(numRetries uint8) QueryOption {
}
}

// InvalidTxThreshold is the threshold for the fraction of peers that need to
// respond to a TX with a code of pushtx.Invalid to count it as invalid, even
// if not all peers respond.
//
// NOTE: This option is currently only used when publishing a transaction.
func InvalidTxThreshold(invalidTxThreshold float32) QueryOption {
return func(qo *queryOptions) {
qo.invalidTxThreshold = invalidTxThreshold
}
}

// PeerConnectTimeout is a query option that lets the query know how long to
// wait for the underlying chain service to connect to a peer before giving up
// on a query in case we don't have any peers.
Expand All @@ -168,6 +204,17 @@ func PeerConnectTimeout(timeout time.Duration) QueryOption {
}
}

// RejectTimeout is the time we'll wait after sending a response to an INV
// query for a potential reject answer. If we don't get a reject before this
// delay, we assume the TX was accepted.
//
// NOTE: This option is currently only used when publishing a transaction.
func RejectTimeout(rejectTimeout time.Duration) QueryOption {
return func(qo *queryOptions) {
qo.rejectTimeout = rejectTimeout
}
}

// Encoding is a query option that allows the caller to set a message encoding
// for the query messages.
func Encoding(encoding wire.MessageEncoding) QueryOption {
Expand Down Expand Up @@ -1396,18 +1443,33 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
inv := wire.NewMsgInv()
inv.AddInvVect(wire.NewInvVect(invType, &txHash))

// We'll gather all of the peers who replied to our query, along with
// We'll gather all the peers who replied to our query, along with
// the ones who rejected it and their reason for rejecting it. We'll use
// this to determine whether our transaction was actually rejected.
numReplied := 0
rejections := make(map[pushtx.BroadcastError]int)
replies := make(map[int32]struct{})
rejections := make(map[int32]*pushtx.BroadcastError)
rejectCodes := make(map[pushtx.BroadcastErrorCode]int)

// closers is a map that tracks the delayed closers we need to make sure
// the peer quit channel is closed after a timeout.
closers := make(map[int32]*delayedCloser)

// Send the peer query and listen for getdata.
s.queryAllPeers(
inv,
func(sp *ServerPeer, resp wire.Message, quit chan<- struct{},
peerQuit chan<- struct{}) {

// The "closer" can be used to either close the peer
// quit channel after a certain timeout or immediately.
closer, ok := closers[sp.ID()]
if !ok {
closer = newDelayedCloser(
peerQuit, qo.rejectTimeout,
)
closers[sp.ID()] = closer
}

switch response := resp.(type) {
// A peer has replied with a GetData message, so we'll
// send them the transaction.
Expand All @@ -1418,7 +1480,24 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
tx, nil, qo.encoding,
)

numReplied++
// Peers might send the INV
// request multiple times, we
// need to de-duplicate them
// using a map.
replies[sp.ID()] = struct{}{}

// Okay, so the peer responded
// with an INV message, and we
// sent out the TX. If we never
// hear anything back from the
// peer it means they accepted
// the tx. If we get a reject,
// things are clear as well. But
// what if they are just slow to
// respond? We'll give them
// another bit of time to
// respond.
closer.closeEventually(s.quit)
}
}

Expand All @@ -1436,7 +1515,18 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
broadcastErr := pushtx.ParseBroadcastError(
response, sp.Addr(),
)
rejections[*broadcastErr]++
rejections[sp.ID()] = broadcastErr
rejectCodes[broadcastErr.Code]++

log.Debugf("Transaction %v rejected by peer "+
"%v: code = %v, reason = %q", txHash,
sp.Addr(), broadcastErr.Code,
broadcastErr.Reason)

// A reject message is final, so we can close
// the peer quit channel now, we don't expect
// any further messages.
closer.closeNow()
}
},
// Default to 500ms timeout. Default for queryAllPeers is a
Expand All @@ -1455,39 +1545,116 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
// If none of our peers replied to our query, we'll avoid returning an
// error as the reliable broadcaster will take care of broadcasting this
// transaction upon every block connected/disconnected.
if numReplied == 0 {
if len(replies) == 0 {
log.Debugf("No peers replied to inv message for transaction %v",
tx.TxHash())
return nil
}

// firstRejectWithCode returns the first reject error that we have for
// a certain error code.
firstRejectWithCode := func(code pushtx.BroadcastErrorCode) error {
for _, broadcastErr := range rejections {
if broadcastErr.Code == code {
return broadcastErr
}
}

// We can't really get here unless something is totally wrong in
// the above error mapping code.
return fmt.Errorf("invalid error mapping")
}

// If all of our peers who replied to our query also rejected our
// transaction, we'll deem that there was actually something wrong with
// it so we'll return the most rejected error between all of our peers.
//
// TODO(wilmer): This might be too naive, some rejections are more
// critical than others.
//
// TODO(wilmer): This does not cover the case where a peer also rejected
// our transaction but didn't send the response within our given timeout
// and certain other cases. Due to this, we should probably decide on a
// threshold of rejections instead.
if numReplied == len(rejections) {
// it, so we'll return the most rejected error between all of our peers.
log.Debugf("Got replies from %d peers and %d rejections", len(replies),
len(rejections))
if len(replies) == len(rejections) {
log.Warnf("All peers rejected transaction %v checking errors",
tx.TxHash())

mostRejectedCount := 0
var mostRejectedErr pushtx.BroadcastError

for broadcastErr, count := range rejections {
// First, find the reject code that was returned most often.
var (
mostRejectedCount = 0
mostRejectedCode pushtx.BroadcastErrorCode
)
for code, count := range rejectCodes {
if count > mostRejectedCount {
mostRejectedCount = count
mostRejectedErr = broadcastErr
mostRejectedCode = code
}
}

return &mostRejectedErr
// Then return the first error we have for that code (it doesn't
// really matter which one, as long as our error code parsing is
// correct).
return firstRejectWithCode(mostRejectedCode)
}

// We did get some rejections, but not from all peers. Perhaps that's
// due to some peers responding too slowly. Or it could also be a
// malicious peer trying to block us from publishing a TX. That's why
// we want to check if more than 60% of the peers (by default) that
// replied in time also sent a reject, we can be pretty certain that
// this TX is probably invalid.
if len(rejections) > 0 {
numInvalid := float32(rejectCodes[pushtx.Invalid])
numPeersResponded := float32(len(replies))

log.Debugf("Of %d peers that replied, %d think the TX is "+
"invalid", numPeersResponded, numInvalid)

// 60% or more (by default) of the peers declared this TX as
// invalid.
if numInvalid/numPeersResponded >= qo.invalidTxThreshold {
log.Warnf("Threshold of %d reached (%d out of %d "+
"peers), declaring TX %v as invalid",
qo.invalidTxThreshold, numInvalid,
numPeersResponded, txHash)

return firstRejectWithCode(pushtx.Invalid)
}
}

return nil
}

// delayedCloser is a struct that makes sure a subject channel is closed at some
// point, either after a delay or immediately.
type delayedCloser struct {
subject chan<- struct{}
timeout time.Duration
once sync.Once
}

// newDelayedCloser creates a new delayed closer for the given subject channel.
func newDelayedCloser(subject chan<- struct{},
timeout time.Duration) *delayedCloser {

return &delayedCloser{
subject: subject,
timeout: timeout,
}
}

// closeEventually closes the subject channel after the configured timeout or
// immediately if the quit channel is closed.
func (t *delayedCloser) closeEventually(quit chan struct{}) {
go func() {
defer t.closeNow()

select {
case <-time.After(t.timeout):
case <-quit:
}
}()
}

// closeNow immediately closes the subject channel. This can safely be called
// multiple times as it will only attempt to close the channel at most once.
func (t *delayedCloser) closeNow() {
t.once.Do(func() {
close(t.subject)
})
}