Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: multi-threading tx sending not efficient #1587

Merged
merged 9 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion testground/benchmark/benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def entrypoint(ctx: Context):

test_finish_entry = f"finish-test-{ctx.params.test_group_id}"
if not ctx.is_validator:
generate_load(cli, ctx.params.num_accounts, ctx.params.num_txs, ctx.global_seq)
generate_load(ctx.params.num_accounts, ctx.params.num_txs, ctx.global_seq)
print("finish test", ctx.group_seq)
ctx.sync.signal_and_wait(
test_finish_entry, ctx.params.test_group_instance_count
Expand Down
21 changes: 11 additions & 10 deletions testground/benchmark/benchmark/peer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import json
import tempfile
from pathlib import Path
Expand Down Expand Up @@ -116,16 +117,16 @@ def init_node(
def gen_genesis(
cli: ChainCommand, leader_home: Path, peers: List[PeerPacket], genesis_patch: dict
):
for peer in peers:
with tempfile.NamedTemporaryFile() as fp:
fp.write(json.dumps(peer.accounts, default=pydantic_encoder).encode())
fp.flush()
cli(
"genesis",
"bulk-add-genesis-account",
fp.name,
home=leader_home,
)
accounts = itertools.chain(peer.accounts for peer in peers)
with tempfile.NamedTemporaryFile() as fp:
fp.write(json.dumps(accounts, default=pydantic_encoder).encode())
fp.flush()
cli(
"genesis",
"bulk-add-genesis-account",
fp.name,
home=leader_home,
)
collect_gen_tx(cli, peers, home=leader_home)
cli("genesis", "validate", home=leader_home)
return patch_json(
Expand Down
68 changes: 58 additions & 10 deletions testground/benchmark/benchmark/sendtx.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import aiohttp
import ujson
import web3
from eth_account import Account

from .utils import gen_account, send_transaction

GAS_PRICE = 1000000000
CHAIN_ID = 777
LOCAL_JSON_RPC = "http://localhost:8545"


def test_tx(nonce: int):
return {
"to": "0x0000000000000000000000000000000000000000",
"value": 1,
"nonce": nonce,
"gas": 21000,
"gasPrice": GAS_PRICE,
"chainId": CHAIN_ID,
}


def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
Expand All @@ -22,15 +38,8 @@ def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):

nonce = initial_nonce
while nonce < initial_nonce + tx_amount:
tx = {
"to": "0x0000000000000000000000000000000000000000",
"value": 1,
"nonce": nonce,
"gas": 21000,
"gasPrice": GAS_PRICE,
}
try:
send_transaction(w3, tx, acct, wait=False)
send_transaction(w3, test_tx(nonce), acct, wait=False)
except ValueError as e:
msg = str(e)
if "invalid nonce" in msg:
Expand All @@ -55,9 +64,9 @@ def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
)


def generate_load(cli, num_accounts, num_txs, global_seq, **kwargs):
def generate_load(num_accounts, num_txs, global_seq, **kwargs):
w3 = web3.Web3(web3.providers.HTTPProvider("http://localhost:8545"))
assert w3.eth.chain_id == 777
assert w3.eth.chain_id == CHAIN_ID
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
with ThreadPoolExecutor(max_workers=num_accounts) as executor:
futs = (executor.submit(sendtx, w3, acct, num_txs) for acct in accounts)
Expand All @@ -66,3 +75,42 @@ def generate_load(cli, num_accounts, num_txs, global_seq, **kwargs):
fut.result()
except Exception as e:
print("test task failed", e)


def prepare_txs(global_seq, num_accounts, num_txs):
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
txs = []
for i in range(num_txs):
for acct in accounts:
tx = {
"to": "0x0000000000000000000000000000000000000000",
"value": 1,
"nonce": i,
"gas": 21000,
"gasPrice": GAS_PRICE,
"chainId": CHAIN_ID,
}
txs.append(acct.sign_transaction(tx).rawTransaction.hex())
yihuang marked this conversation as resolved.
Show resolved Hide resolved
return txs


async def send_txs(txs):
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(
connector=connector, json_serialize=ujson.dumps
) as session:
tasks = [
asyncio.ensure_future(
session.post(
LOCAL_JSON_RPC,
json={
"jsonrpc": "2.0",
"method": "eth_sendRawTransaction",
"params": [raw],
"id": 1,
},
)
)
for raw in txs
]
await asyncio.gather(*tasks)
23 changes: 10 additions & 13 deletions testground/benchmark/benchmark/stateless.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import shutil
Expand All @@ -22,11 +23,11 @@
init_node,
patch_configs,
)
from .sendtx import generate_load
from .sendtx import prepare_txs, send_txs
from .stats import dump_block_stats
from .topology import connect_all
from .types import PeerPacket
from .utils import block_height, block_txs, wait_for_block, wait_for_port, wait_for_w3
from .utils import block_height, block_txs, wait_for_block, wait_for_port

# use cronosd on host machine
LOCAL_CRONOSD_PATH = "cronosd"
Expand Down Expand Up @@ -206,9 +207,13 @@ def run(outdir: str, datadir: str, cronosd, global_seq):


def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
run_echo_server(ECHO_SERVER_PORT)
if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
txs = prepare_txs(global_seq, cfg["num_accounts"], cfg["num_txs"])
else:
txs = []

# wait for persistent peers to be ready
run_echo_server(ECHO_SERVER_PORT)
wait_for_peers(home)

print("start node")
Expand All @@ -223,16 +228,8 @@ def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
wait_for_port(8545)
wait_for_block(cli, 3)

if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
wait_for_w3()
generate_load(
cli,
cfg["num_accounts"],
cfg["num_txs"],
global_seq,
home=home,
output="json",
)
if txs:
asyncio.run(send_txs(txs))

# node quit when the chain is idle or halted for a while
detect_idle_halted(20, 20)
Expand Down
89 changes: 88 additions & 1 deletion testground/benchmark/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions testground/benchmark/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ hexbytes = "^0"
bech32 = "^1"
requests = "^2.32"
click = "^8.1.7"
ujson = "^5.10.0"

[tool.poetry.dev-dependencies]
pytest = "^8.2"
Expand Down
Loading