Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: test transactions can't be saved and reused #1575

Merged
merged 10 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions testground/benchmark/benchmark/stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import click
import tomlkit

from . import transaction
from .cli import ChainCommand
from .echo import run_echo_server
from .peer import (
Expand All @@ -23,7 +24,6 @@
init_node,
patch_configs,
)
from .sendtx import prepare_txs, send_txs
from .stats import dump_block_stats
from .topology import connect_all
from .types import PeerPacket
Expand Down Expand Up @@ -184,7 +184,7 @@ def run(outdir: str, datadir: str, cronosd, global_seq):
home = datadir / group / str(group_seq)

try:
return do_run(home, cronosd, group, global_seq, cfg)
return do_run(datadir, home, cronosd, group, global_seq, cfg)
finally:
# collect outputs
output = Path("/data.tar.bz2")
Expand All @@ -198,10 +198,45 @@ def run(outdir: str, datadir: str, cronosd, global_seq):
shutil.copy(output, filename)


def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
@cli.command()
@click.argument("outdir")
@click.option("--nodes", default=10)
@click.option("--num-accounts", default=10)
@click.option("--num-txs", default=1000)
def gen_txs(**kwargs):
return _gen_txs(**kwargs)


@cli.command()
@click.argument("options", callback=validate_json)
def generic_gen_txs(options: dict):
return _gen_txs(**options)


def _gen_txs(
outdir: str,
nodes: int = 10,
num_accounts: int = 10,
num_txs: int = 1000,
):
outdir = Path(outdir)
for global_seq in range(nodes):
print("generating", num_accounts * num_txs, "txs for node", global_seq)
txs = transaction.gen(global_seq, num_accounts, num_txs)
transaction.save(txs, outdir, global_seq)
print("saved", len(txs), "txs for node", global_seq)


def do_run(
datadir: Path, home: Path, cronosd: str, group: str, global_seq: int, cfg: dict
):
if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
print("preparing", cfg["num_accounts"] * cfg["num_txs"], "txs")
txs = prepare_txs(global_seq, cfg["num_accounts"], cfg["num_txs"])
txs = transaction.load(datadir, global_seq)
if txs:
print("loaded", len(txs), "txs")
else:
print("generating", cfg["num_accounts"] * cfg["num_txs"], "txs")
txs = transaction.gen(global_seq, cfg["num_accounts"], cfg["num_txs"])
else:
txs = []

Expand All @@ -222,7 +257,8 @@ def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
wait_for_block(cli, 3)

if txs:
asyncio.run(send_txs(txs))
asyncio.run(transaction.send(txs))
print("sent", len(txs), "txs")

# node quit when the chain is idle or halted for a while
detect_idle_halted(20, 20)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
import time
from pathlib import Path

import aiohttp
import ujson
import web3
from eth_account import Account

from .utils import gen_account, send_transaction
from .utils import gen_account

GAS_PRICE = 1000000000
CHAIN_ID = 777
LOCAL_JSON_RPC = "http://localhost:8545"
CONNECTION_POOL_SIZE = 1024
TXS_DIR = "txs"


def test_tx(nonce: int):
Expand All @@ -25,57 +24,35 @@ def test_tx(nonce: int):
}


def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
initial_nonce = w3.eth.get_transaction_count(acct.address)
print(
"test begin, address:",
acct.address,
"balance:",
w3.eth.get_balance(acct.address),
"nonce:",
initial_nonce,
)

nonce = initial_nonce
while nonce < initial_nonce + tx_amount:
try:
send_transaction(w3, test_tx(nonce), acct, wait=False)
except ValueError as e:
msg = str(e)
if "invalid nonce" in msg:
print("invalid nonce and retry", nonce)
time.sleep(1)
continue
if "tx already in mempool" not in msg:
raise

nonce += 1

if nonce % 100 == 0:
print(f"{acct.address} sent {nonce} transactions")

print(
"test end, address:",
acct.address,
"balance:",
w3.eth.get_balance(acct.address),
"nonce:",
w3.eth.get_transaction_count(acct.address),
)


def prepare_txs(global_seq, num_accounts, num_txs):
def gen(global_seq, num_accounts, num_txs) -> [str]:
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
txs = []
for i in range(num_txs):
for acct in accounts:
txs.append(acct.sign_transaction(test_tx(i)).rawTransaction.hex())
if len(txs) % 1000 == 0:
print("prepared", len(txs), "txs")
print("generated", len(txs), "txs for node", global_seq)

return txs


def save(txs: [str], datadir: Path, global_seq: int):
d = datadir / TXS_DIR
d.mkdir(parents=True, exist_ok=True)
path = d / f"{global_seq}.json"
with path.open("w") as f:
ujson.dump(txs, f)


def load(datadir: Path, global_seq: int) -> [str]:
path = datadir / TXS_DIR / f"{global_seq}.json"
if not path.exists():
return

with path.open("r") as f:
return ujson.load(f)


async def async_sendtx(session, raw):
async with session.post(
LOCAL_JSON_RPC,
Expand All @@ -91,7 +68,7 @@ async def async_sendtx(session, raw):
print("send tx error", data["error"])


async def send_txs(txs):
async def send(txs):
connector = aiohttp.TCPConnector(limit=1024)
async with aiohttp.ClientSession(
connector=connector, json_serialize=ujson.dumps
Expand Down
Loading