Skip to content

Commit

Permalink
qa: extend and harden p2p-tx-download
Browse files Browse the repository at this point in the history
- Implements getmocktime instead of time.sleep to prevent races
- Enforce success of all mocktime operations
- Do not increment mocktime with 0 seconds
- Use fresh incoming peers for each test
- Explicitly do test teardown
- Remove TXID_RELAY_DELAY as this no longer exists in the dogecoind
  implementation
- Add the inflight throttling test to replace the removed inflight
  limit test
- Add an expiry fallback test
  • Loading branch information
patricklodder committed Jul 15, 2024
1 parent 8eb5214 commit a5a4e4b
Showing 1 changed file with 174 additions and 48 deletions.
222 changes: 174 additions & 48 deletions qa/rpc-tests/p2p-tx-download.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
GETDATA_TX_INTERVAL = 30 # seconds
TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL # 5 minutes
INBOUND_PEER_TX_DELAY = 2 # seconds
TXID_RELAY_DELAY = 2 # seconds
OVERLOADED_PEER_DELAY = 2 # seconds
MAX_GETDATA_IN_FLIGHT = 100
MAX_PEER_TX_ANNOUNCEMENTS = 5000

MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY

class TxDownloadTestNode(SingleNodeConnCB):
def __init__(self):
SingleNodeConnCB.__init__(self)
Expand Down Expand Up @@ -87,6 +89,8 @@ def run_test(self):
self.test_disconnect_fallback()
self.test_notfound_fallback()
self.test_max_announcements()
self.test_inflight_throttling()
self.test_expiry_fallback()

def setup_network(self):
# set up full nodes
Expand All @@ -100,21 +104,31 @@ def setup_network(self):
connect_nodes(self.nodes[0], 1)
self.sync_all()

# set up incoming (non-honest) peers
self.incoming_peers = []
for i in range(8):
self.incoming_peers.append(self.create_testnode())

# create a single control peer that is only used for ping sync
self.control_peer = self.create_testnode()
NetworkThread().start()
for peer in self.incoming_peers:
peer.wait_for_verack()
self.control_peer.wait_for_verack()

def create_testnode(self, node_idx=0):
node = TxDownloadTestNode()
conn = NodeConn('127.0.0.1', p2p_port(node_idx), self.nodes[node_idx], node)
node.add_connection(conn)
return node

def connect_incoming_peers(self, num):
peers = []
for _ in range(num):
peer = self.create_testnode()
peer.wait_for_verack()
peers.append(peer)
return peers

def disconnect_incoming_peers(self, peers):
for peer in peers:
if not peer.disconnect():
return False
return True

def any_received_getdata(self, hash, peers):
for peer in peers:
if hash in peer.tx_getdata_received:
Expand All @@ -135,6 +149,11 @@ def getdata_received():
return True
return wait_until(getdata_received, timeout=10)

def wait_for_mocktime(self, node):
def mocktime_is_good():
return node.getmocktime() >= self.mocktime
return wait_until(mocktime_is_good, timeout=10)

def find_winning_peer(self, peers, hash):
# detect which peer won a race for getting a getdata hash
selected = None
Expand All @@ -154,40 +173,53 @@ def forward_mocktime(self, delta_time):
self.mocktime += delta_time
for node in self.nodes:
node.setmocktime(self.mocktime)
# give the nodes some time to process the new mocktime
# can be removed when we have getmocktime
time.sleep(0.1)
if not self.wait_for_mocktime(node):
return False
# sync the control peer with ping so that we're 100% sure we have
# entered a new message handling loop
self.control_peer.sync_with_ping()
return True

def forward_mocktime_step2(self, iterations):
# forward mocktime in steps of 2 seconds to allow the nodes
# time to recognize they have to do something
for i in range(iterations):
self.forward_mocktime(2)
if not self.forward_mocktime(2):
return False
return True

def next_fake_txid(self):
self.fake_txid += 1
return self.fake_txid

def test_tx_request(self):
txid = self.next_fake_txid()
self.forward_mocktime(0)
assert self.forward_mocktime(1)

# use incoming peers 0 and 1
peerset = self.incoming_peers[0:4]
for peer in peerset:
# use 4 peers
peers = self.connect_incoming_peers(4)
for peer in peers:
peer.send_tx_inv([txid])

# To make sure we eventually ask the tx from all 4 nodes that announced
# use 2 more peers that do not send invs
otherpeers = self.connect_incoming_peers(2)

# To make sure we eventually ask the tx from all 4 peers that announced
# to us, we're now jumping 4 * (2 + 2 + 30) = 136 seconds to the future
warp = 4 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + GETDATA_TX_INTERVAL)
warp = 4 * MAX_GETDATA_INBOUND_WAIT
self.forward_mocktime_step2(warp//2)

# All peers that sent the inv should now have received a getdata request
assert self.wait_for_getdata([txid], peerset)
assert self.wait_for_getdata([txid], peers)

# Make sure the other peers did not receive the getdata because they
# didn't indicate they have the tx
assert not self.any_received_getdata(txid, self.incoming_peers[4:8])
assert not self.any_received_getdata(txid, otherpeers)

# cleanup
assert self.disconnect_incoming_peers(peers)
assert self.disconnect_incoming_peers(otherpeers)
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_invblock_resolution(self):
inputs = [self.nodes[1].listunspent()[0]]
Expand All @@ -198,93 +230,187 @@ def test_invblock_resolution(self):
tx.rehash()
txid = int(tx.hash, 16)

self.forward_mocktime(0)
assert self.forward_mocktime(1)

# make sure that node 1 is outbound for node 0
assert self.nodes[0].getpeerinfo()[0]['inbound'] == False

# use all peers that only inv but never respond to getdata
for peer in self.incoming_peers:
# use 8 peers that only inv but never respond to getdata
peers = self.connect_incoming_peers(8)
for peer in peers:
peer.send_tx_inv([txid])

# send from our honest node last
self.nodes[1].sendrawtransaction(tx_hex)

# We jump forward 2x (2 + 2) + 30 + 2 (margin) = 40 seconds to make sure
# We jump forward 2x max inbound wait time to make sure
# that we get to the point where we re-evaluate the transaction in 2
# second steps
warp = 2 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY) + GETDATA_TX_INTERVAL + 2
self.forward_mocktime_step2(warp//2)
warp = 2 * MAX_GETDATA_INBOUND_WAIT
assert self.forward_mocktime_step2(warp//2)

assert tx.hash in self.nodes[0].getrawmempool()

assert self.disconnect_incoming_peers(peers)
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_inflight_throttling(self):
# First, forward time by 2x inflight timeout, so that we have clean
# registers for each peer
self.forward_mocktime(2 * TX_EXPIRY_INTERVAL)

# now send MAX_GETDATA_IN_FLIGHT (=100) invs with 1 peer
peer = self.connect_incoming_peers(1)[0]
invs = []
for i in range(MAX_GETDATA_IN_FLIGHT):
txid = self.next_fake_txid()
invs.append(txid)

peer.send_tx_inv(invs)

# warp forward 3 seconds in steps of 1 second
warp = INBOUND_PEER_TX_DELAY + 1
for _ in range(warp):
assert self.forward_mocktime(1)
peer.sync_with_ping()

# test that we got all the getdata
assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT)

peer.send_tx_inv([self.next_fake_txid()])

# warp forward 3 seconds again
warp = INBOUND_PEER_TX_DELAY + 1
for _ in range(warp):
assert self.forward_mocktime(1)
peer.sync_with_ping()

# test that we haven't received the getdata request yet
assert len(peer.tx_getdata_received) == MAX_GETDATA_IN_FLIGHT

# additionally warp the overloaded peer delay time second margin
warp = OVERLOADED_PEER_DELAY
for _ in range(warp):
assert self.forward_mocktime(1)
peer.sync_with_ping()

# test that we now received the getdata
assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT + 1)

# cleanup
assert self.disconnect_incoming_peers([peer])
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_expiry_fallback(self):
# create 2 new peers
peers = self.connect_incoming_peers(2)

txid = self.next_fake_txid()
assert self.forward_mocktime(1)

for peer in peers:
peer.send_tx_inv([txid])

# warp forward 2 + 2 (margin) = 4 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + 2
assert self.forward_mocktime_step2(warp//2)

winner, loser = self.find_winning_peer(peers, txid)

# expire the request from the winning peer by doing nothing
assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2)

# the losing peer is now the fallback and received a getdata message
assert self.wait_for_getdata([txid], [loser])

#cleanup
assert self.disconnect_incoming_peers(peers)
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_notfound_fallback(self):
# use peer 4 and 5 to concurrently send 2 invs
peers = self.incoming_peers[4:6]
# use 2 peers to concurrently send 2 invs
peers = self.connect_incoming_peers(2)
txid = self.next_fake_txid()
self.forward_mocktime(1)
assert self.forward_mocktime(1)

for peer in peers:
peer.send_tx_inv([txid])

# warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# warp forward 2 + 2 (margin) = 4 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + 2
assert self.forward_mocktime_step2(warp//2)

winner, loser = self.find_winning_peer(peers, txid)

# send a reject message from the peer that won the race
winner.send_tx_notfound([txid])

# warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2
warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# warp forward the max wait time in steps of 2
assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2)

# the losing peer is now the fallback and received a getdata message
assert self.wait_for_getdata([txid], [loser])

#cleanup
assert self.disconnect_incoming_peers(peers)
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_disconnect_fallback(self):
# use peer 6 and 7 to concurrently send 2 invs
peers = self.incoming_peers[6:8]
# use 2 peers to concurrently send 2 invs
peers = self.connect_incoming_peers(2)
txid = self.next_fake_txid()
self.forward_mocktime(1)
assert self.forward_mocktime(1)

for peer in peers:
peer.send_tx_inv([txid])

# warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# warp forward 2 + 2 (margin) = 4 seconds in steps of 2
warp = INBOUND_PEER_TX_DELAY + 2
assert self.forward_mocktime_step2(warp//2)

winner, loser = self.find_winning_peer(peers, txid)

# drop connection from the peer that won the race
assert winner.disconnect()

# warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2
warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2
self.forward_mocktime_step2(warp//2)
# warp forward the max wait time in steps of 2
assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2)

# the losing peer is now the fallback and received a getdata message
assert self.wait_for_getdata([txid], [loser])

#cleanup
assert self.disconnect_incoming_peers(peers)
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

def test_max_announcements(self):
# create a test node
peer = self.create_testnode()
peer.wait_for_verack()
peer = self.connect_incoming_peers(1)[0]

assert self.forward_mocktime(1)

hashes = []
for _ in range(MAX_PEER_TX_ANNOUNCEMENTS):
hashes.append(self.next_fake_txid())

peer.send_tx_inv(hashes)
peer.wait_until_numgetdata(MAX_PEER_TX_ANNOUNCEMENTS)

# wait the maximum time before expiry minus 2 seconds to receive all
# getdata requests with this peer
warp = MAX_GETDATA_INBOUND_WAIT - 2
assert self.forward_mocktime_step2(warp//2)
assert peer.wait_until_numgetdata(MAX_PEER_TX_ANNOUNCEMENTS)
peer.sync_with_ping()

# send one more - this should never come back.
# send one more and wait the maximum time - this should never come back.
extratx = self.next_fake_txid()
peer.send_tx_inv([extratx])
assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2)
assert not self.any_received_getdata(extratx, [peer])

#cleanup
assert self.disconnect_incoming_peers([peer])
assert self.forward_mocktime(TX_EXPIRY_INTERVAL)

if __name__ == '__main__':
TxDownloadTest().main()

0 comments on commit a5a4e4b

Please sign in to comment.