diff --git a/query.go b/query.go index 89919fc..e06fad0 100644 --- a/query.go +++ b/query.go @@ -4,11 +4,11 @@ package neutrino import ( "fmt" - "github.com/davecgh/go-spew/spew" "sync" "sync/atomic" "time" + "github.com/davecgh/go-spew/spew" "github.com/gcash/bchd/blockchain" "github.com/gcash/bchd/chaincfg/chainhash" "github.com/gcash/bchd/wire" @@ -33,6 +33,17 @@ var ( // to a peer that previously failed because of a timeout. QueryPeerCooldown = time.Second * 5 + // QueryRejectTimeout is the time we'll wait after sending a response to + // an INV query for a potential reject answer. If we don't get a reject + // before this delay, we assume the TX was accepted. + QueryRejectTimeout = time.Second + + // QueryInvalidTxThreshold is the threshold for the fraction of peers + // that need to respond to a TX with a code of pushtx.Invalid to count + // it as invalid, even if not all peers respond. This currently + // corresponds to 60% of peers that need to reject. + QueryInvalidTxThreshold float32 = 0.6 + // QueryNumRetries specifies how many times to retry sending a query to // each peer before we've concluded we aren't going to get a valid // response. This allows to make up for missed messages in some @@ -81,6 +92,12 @@ type queryOptions struct { // on a query in case we don't have any peers. peerConnectTimeout time.Duration + // rejectTimeout is the time we'll wait after sending a response to an + // INV query for a potential reject answer. If we don't get a reject + // before this delay, we assume the TX was accepted. This option is only + // used when publishing a transaction. + rejectTimeout time.Duration + // encoding lets the query know which encoding to use when queueing // messages to a peer. encoding wire.MessageEncoding @@ -95,6 +112,12 @@ type queryOptions struct { // they're doing something like a key import. persistToDisk bool + // invalidTxThreshold is the threshold for the fraction of peers + // that need to respond to a TX with a code of pushtx.Invalid to count + // it as invalid, even if not all peers respond. This option is only + // used when publishing a transaction. + invalidTxThreshold float32 + // optimisticBatch indicates whether we expect more calls to follow, // and that we should attempt to batch more items with the query such // that they can be cached, avoiding the extra round trip. @@ -131,7 +154,9 @@ func defaultQueryOptions() *queryOptions { timeout: QueryTimeout, numRetries: uint8(QueryNumRetries), peerConnectTimeout: QueryPeerConnectTimeout, + rejectTimeout: QueryRejectTimeout, encoding: QueryEncoding, + invalidTxThreshold: QueryInvalidTxThreshold, optimisticBatch: noBatch, } } @@ -159,6 +184,17 @@ func NumRetries(numRetries uint8) QueryOption { } } +// InvalidTxThreshold is the threshold for the fraction of peers that need to +// respond to a TX with a code of pushtx.Invalid to count it as invalid, even +// if not all peers respond. +// +// NOTE: This option is currently only used when publishing a transaction. +func InvalidTxThreshold(invalidTxThreshold float32) QueryOption { + return func(qo *queryOptions) { + qo.invalidTxThreshold = invalidTxThreshold + } +} + // PeerConnectTimeout is a query option that lets the query know how long to // wait for the underlying chain service to connect to a peer before giving up // on a query in case we don't have any peers. @@ -168,6 +204,17 @@ func PeerConnectTimeout(timeout time.Duration) QueryOption { } } +// RejectTimeout is the time we'll wait after sending a response to an INV +// query for a potential reject answer. If we don't get a reject before this +// delay, we assume the TX was accepted. +// +// NOTE: This option is currently only used when publishing a transaction. +func RejectTimeout(rejectTimeout time.Duration) QueryOption { + return func(qo *queryOptions) { + qo.rejectTimeout = rejectTimeout + } +} + // Encoding is a query option that allows the caller to set a message encoding // for the query messages. func Encoding(encoding wire.MessageEncoding) QueryOption { @@ -1396,11 +1443,16 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e inv := wire.NewMsgInv() inv.AddInvVect(wire.NewInvVect(invType, &txHash)) - // We'll gather all of the peers who replied to our query, along with + // We'll gather all the peers who replied to our query, along with // the ones who rejected it and their reason for rejecting it. We'll use // this to determine whether our transaction was actually rejected. - numReplied := 0 - rejections := make(map[pushtx.BroadcastError]int) + replies := make(map[int32]struct{}) + rejections := make(map[int32]*pushtx.BroadcastError) + rejectCodes := make(map[pushtx.BroadcastErrorCode]int) + + // closers is a map that tracks the delayed closers we need to make sure + // the peer quit channel is closed after a timeout. + closers := make(map[int32]*delayedCloser) // Send the peer query and listen for getdata. s.queryAllPeers( @@ -1408,6 +1460,16 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e func(sp *ServerPeer, resp wire.Message, quit chan<- struct{}, peerQuit chan<- struct{}) { + // The "closer" can be used to either close the peer + // quit channel after a certain timeout or immediately. + closer, ok := closers[sp.ID()] + if !ok { + closer = newDelayedCloser( + peerQuit, qo.rejectTimeout, + ) + closers[sp.ID()] = closer + } + switch response := resp.(type) { // A peer has replied with a GetData message, so we'll // send them the transaction. @@ -1418,7 +1480,24 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e tx, nil, qo.encoding, ) - numReplied++ + // Peers might send the INV + // request multiple times, we + // need to de-duplicate them + // using a map. + replies[sp.ID()] = struct{}{} + + // Okay, so the peer responded + // with an INV message, and we + // sent out the TX. If we never + // hear anything back from the + // peer it means they accepted + // the tx. If we get a reject, + // things are clear as well. But + // what if they are just slow to + // respond? We'll give them + // another bit of time to + // respond. + closer.closeEventually(s.quit) } } @@ -1436,7 +1515,18 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e broadcastErr := pushtx.ParseBroadcastError( response, sp.Addr(), ) - rejections[*broadcastErr]++ + rejections[sp.ID()] = broadcastErr + rejectCodes[broadcastErr.Code]++ + + log.Debugf("Transaction %v rejected by peer "+ + "%v: code = %v, reason = %q", txHash, + sp.Addr(), broadcastErr.Code, + broadcastErr.Reason) + + // A reject message is final, so we can close + // the peer quit channel now, we don't expect + // any further messages. + closer.closeNow() } }, // Default to 500ms timeout. Default for queryAllPeers is a @@ -1455,39 +1545,116 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e // If none of our peers replied to our query, we'll avoid returning an // error as the reliable broadcaster will take care of broadcasting this // transaction upon every block connected/disconnected. - if numReplied == 0 { + if len(replies) == 0 { log.Debugf("No peers replied to inv message for transaction %v", tx.TxHash()) return nil } + // firstRejectWithCode returns the first reject error that we have for + // a certain error code. + firstRejectWithCode := func(code pushtx.BroadcastErrorCode) error { + for _, broadcastErr := range rejections { + if broadcastErr.Code == code { + return broadcastErr + } + } + + // We can't really get here unless something is totally wrong in + // the above error mapping code. + return fmt.Errorf("invalid error mapping") + } + // If all of our peers who replied to our query also rejected our // transaction, we'll deem that there was actually something wrong with - // it so we'll return the most rejected error between all of our peers. - // - // TODO(wilmer): This might be too naive, some rejections are more - // critical than others. - // - // TODO(wilmer): This does not cover the case where a peer also rejected - // our transaction but didn't send the response within our given timeout - // and certain other cases. Due to this, we should probably decide on a - // threshold of rejections instead. - if numReplied == len(rejections) { + // it, so we'll return the most rejected error between all of our peers. + log.Debugf("Got replies from %d peers and %d rejections", len(replies), + len(rejections)) + if len(replies) == len(rejections) { log.Warnf("All peers rejected transaction %v checking errors", tx.TxHash()) - mostRejectedCount := 0 - var mostRejectedErr pushtx.BroadcastError - - for broadcastErr, count := range rejections { + // First, find the reject code that was returned most often. + var ( + mostRejectedCount = 0 + mostRejectedCode pushtx.BroadcastErrorCode + ) + for code, count := range rejectCodes { if count > mostRejectedCount { mostRejectedCount = count - mostRejectedErr = broadcastErr + mostRejectedCode = code } } - return &mostRejectedErr + // Then return the first error we have for that code (it doesn't + // really matter which one, as long as our error code parsing is + // correct). + return firstRejectWithCode(mostRejectedCode) + } + + // We did get some rejections, but not from all peers. Perhaps that's + // due to some peers responding too slowly. Or it could also be a + // malicious peer trying to block us from publishing a TX. That's why + // we want to check if more than 60% of the peers (by default) that + // replied in time also sent a reject, we can be pretty certain that + // this TX is probably invalid. + if len(rejections) > 0 { + numInvalid := float32(rejectCodes[pushtx.Invalid]) + numPeersResponded := float32(len(replies)) + + log.Debugf("Of %d peers that replied, %d think the TX is "+ + "invalid", numPeersResponded, numInvalid) + + // 60% or more (by default) of the peers declared this TX as + // invalid. + if numInvalid/numPeersResponded >= qo.invalidTxThreshold { + log.Warnf("Threshold of %d reached (%d out of %d "+ + "peers), declaring TX %v as invalid", + qo.invalidTxThreshold, numInvalid, + numPeersResponded, txHash) + + return firstRejectWithCode(pushtx.Invalid) + } } return nil } + +// delayedCloser is a struct that makes sure a subject channel is closed at some +// point, either after a delay or immediately. +type delayedCloser struct { + subject chan<- struct{} + timeout time.Duration + once sync.Once +} + +// newDelayedCloser creates a new delayed closer for the given subject channel. +func newDelayedCloser(subject chan<- struct{}, + timeout time.Duration) *delayedCloser { + + return &delayedCloser{ + subject: subject, + timeout: timeout, + } +} + +// closeEventually closes the subject channel after the configured timeout or +// immediately if the quit channel is closed. +func (t *delayedCloser) closeEventually(quit chan struct{}) { + go func() { + defer t.closeNow() + + select { + case <-time.After(t.timeout): + case <-quit: + } + }() +} + +// closeNow immediately closes the subject channel. This can safely be called +// multiple times as it will only attempt to close the channel at most once. +func (t *delayedCloser) closeNow() { + t.once.Do(func() { + close(t.subject) + }) +}