diff --git a/testground/benchmark/benchmark/stateless.py b/testground/benchmark/benchmark/stateless.py index b836144475..5e4db2583a 100644 --- a/testground/benchmark/benchmark/stateless.py +++ b/testground/benchmark/benchmark/stateless.py @@ -13,6 +13,7 @@ import click import tomlkit +from . import transaction from .cli import ChainCommand from .echo import run_echo_server from .peer import ( @@ -23,7 +24,6 @@ init_node, patch_configs, ) -from .sendtx import prepare_txs, send_txs from .stats import dump_block_stats from .topology import connect_all from .types import PeerPacket @@ -184,7 +184,7 @@ def run(outdir: str, datadir: str, cronosd, global_seq): home = datadir / group / str(group_seq) try: - return do_run(home, cronosd, group, global_seq, cfg) + return do_run(datadir, home, cronosd, group, global_seq, cfg) finally: # collect outputs output = Path("/data.tar.bz2") @@ -198,10 +198,45 @@ def run(outdir: str, datadir: str, cronosd, global_seq): shutil.copy(output, filename) -def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict): +@cli.command() +@click.argument("outdir") +@click.option("--nodes", default=10) +@click.option("--num-accounts", default=10) +@click.option("--num-txs", default=1000) +def gen_txs(**kwargs): + return _gen_txs(**kwargs) + + +@cli.command() +@click.argument("options", callback=validate_json) +def generic_gen_txs(options: dict): + return _gen_txs(**options) + + +def _gen_txs( + outdir: str, + nodes: int = 10, + num_accounts: int = 10, + num_txs: int = 1000, +): + outdir = Path(outdir) + for global_seq in range(nodes): + print("generating", num_accounts * num_txs, "txs for node", global_seq) + txs = transaction.gen(global_seq, num_accounts, num_txs) + transaction.save(txs, outdir, global_seq) + print("saved", len(txs), "txs for node", global_seq) + + +def do_run( + datadir: Path, home: Path, cronosd: str, group: str, global_seq: int, cfg: dict +): if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True): - print("preparing", cfg["num_accounts"] * cfg["num_txs"], "txs") - txs = prepare_txs(global_seq, cfg["num_accounts"], cfg["num_txs"]) + txs = transaction.load(datadir, global_seq) + if txs: + print("loaded", len(txs), "txs") + else: + print("generating", cfg["num_accounts"] * cfg["num_txs"], "txs") + txs = transaction.gen(global_seq, cfg["num_accounts"], cfg["num_txs"]) else: txs = [] @@ -222,7 +257,8 @@ def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict): wait_for_block(cli, 3) if txs: - asyncio.run(send_txs(txs)) + asyncio.run(transaction.send(txs)) + print("sent", len(txs), "txs") # node quit when the chain is idle or halted for a while detect_idle_halted(20, 20) diff --git a/testground/benchmark/benchmark/sendtx.py b/testground/benchmark/benchmark/transaction.py similarity index 51% rename from testground/benchmark/benchmark/sendtx.py rename to testground/benchmark/benchmark/transaction.py index d3a5395aea..3d95c2709a 100644 --- a/testground/benchmark/benchmark/sendtx.py +++ b/testground/benchmark/benchmark/transaction.py @@ -1,17 +1,16 @@ import asyncio -import time +from pathlib import Path import aiohttp import ujson -import web3 -from eth_account import Account -from .utils import gen_account, send_transaction +from .utils import gen_account GAS_PRICE = 1000000000 CHAIN_ID = 777 LOCAL_JSON_RPC = "http://localhost:8545" CONNECTION_POOL_SIZE = 1024 +TXS_DIR = "txs" def test_tx(nonce: int): @@ -25,57 +24,35 @@ def test_tx(nonce: int): } -def sendtx(w3: web3.Web3, acct: Account, tx_amount: int): - initial_nonce = w3.eth.get_transaction_count(acct.address) - print( - "test begin, address:", - acct.address, - "balance:", - w3.eth.get_balance(acct.address), - "nonce:", - initial_nonce, - ) - - nonce = initial_nonce - while nonce < initial_nonce + tx_amount: - try: - send_transaction(w3, test_tx(nonce), acct, wait=False) - except ValueError as e: - msg = str(e) - if "invalid nonce" in msg: - print("invalid nonce and retry", nonce) - time.sleep(1) - continue - if "tx already in mempool" not in msg: - raise - - nonce += 1 - - if nonce % 100 == 0: - print(f"{acct.address} sent {nonce} transactions") - - print( - "test end, address:", - acct.address, - "balance:", - w3.eth.get_balance(acct.address), - "nonce:", - w3.eth.get_transaction_count(acct.address), - ) - - -def prepare_txs(global_seq, num_accounts, num_txs): +def gen(global_seq, num_accounts, num_txs) -> [str]: accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)] txs = [] for i in range(num_txs): for acct in accounts: txs.append(acct.sign_transaction(test_tx(i)).rawTransaction.hex()) if len(txs) % 1000 == 0: - print("prepared", len(txs), "txs") + print("generated", len(txs), "txs for node", global_seq) return txs +def save(txs: [str], datadir: Path, global_seq: int): + d = datadir / TXS_DIR + d.mkdir(parents=True, exist_ok=True) + path = d / f"{global_seq}.json" + with path.open("w") as f: + ujson.dump(txs, f) + + +def load(datadir: Path, global_seq: int) -> [str]: + path = datadir / TXS_DIR / f"{global_seq}.json" + if not path.exists(): + return + + with path.open("r") as f: + return ujson.load(f) + + async def async_sendtx(session, raw): async with session.post( LOCAL_JSON_RPC, @@ -91,7 +68,7 @@ async def async_sendtx(session, raw): print("send tx error", data["error"]) -async def send_txs(txs): +async def send(txs): connector = aiohttp.TCPConnector(limit=1024) async with aiohttp.ClientSession( connector=connector, json_serialize=ujson.dumps