diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5601f8efdd..9d30403bc2 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,7 +22,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '^1.22.0' + go-version: '1.22.7' - uses: actions/checkout@v3 - uses: cachix/install-nix-action@v23 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 42893e3c78..5eb40dc770 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## UNRELEASED + +### Improvements + +* [#1592](https://github.com/crypto-org-chain/cronos/pull/1592) Change the default parallelism of the block-stm to minimum between GOMAXPROCS and NumCPU + *Sep 13, 2024* ## v1.4.0-rc0 diff --git a/app/app.go b/app/app.go index 13b7b36ee4..f4148bffda 100644 --- a/app/app.go +++ b/app/app.go @@ -987,7 +987,7 @@ func New( sdk.SetAddrCacheEnabled(false) workers := cast.ToInt(appOpts.Get(srvflags.EVMBlockSTMWorkers)) if workers == 0 { - workers = stdruntime.NumCPU() + workers = maxParallelism() } preEstimate := cast.ToBool(appOpts.Get(srvflags.EVMBlockSTMPreEstimate)) logger.Info("block-stm executor enabled", "workers", workers, "pre-estimate", preEstimate) @@ -1369,3 +1369,7 @@ func (app *App) Close() error { app.Logger().Info("Application gracefully shutdown", "error", err) return err } + +func maxParallelism() int { + return min(stdruntime.GOMAXPROCS(0), stdruntime.NumCPU()) +} diff --git a/flake.nix b/flake.nix index 5b3887fecc..7649379adc 100644 --- a/flake.nix +++ b/flake.nix @@ -44,7 +44,7 @@ cronosd-testnet = mkApp packages.cronosd-testnet; stateless-testcase = { type = "app"; - program = "${pkgs.testground-testcase}/bin/stateless-testcase"; + program = "${pkgs.benchmark-testcase}/bin/stateless-testcase"; }; }; defaultPackage = packages.cronosd; diff --git a/go.mod b/go.mod index 22f5881c57..217c40e228 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,7 @@ require ( github.com/creachadair/atomicfile v0.3.1 // indirect github.com/creachadair/tomledit v0.0.24 // indirect github.com/crypto-org-chain/cronos/memiavl v0.0.4 // indirect - github.com/crypto-org-chain/go-block-stm v0.0.0-20240912024944-1cd89976aa5e // indirect + github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 // indirect github.com/danieljoos/wincred v1.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index 2bb00018c1..8de1903d44 100644 --- a/go.sum +++ b/go.sum @@ -433,8 +433,8 @@ github.com/crypto-org-chain/cosmos-sdk/x/tx v0.0.0-20240916022730-3317ca17f0f1 h github.com/crypto-org-chain/cosmos-sdk/x/tx v0.0.0-20240916022730-3317ca17f0f1/go.mod h1:RTiTs4hkXG6IvYGknvB8p79YgjYJdcbzLUOGJChsPnY= github.com/crypto-org-chain/ethermint v0.6.1-0.20240913100216-dbc7eb41488c h1:pJJNL+ZganmfcxzEijVNqwNDhzXsTyMk/Of1/lUvxlM= github.com/crypto-org-chain/ethermint v0.6.1-0.20240913100216-dbc7eb41488c/go.mod h1:D2lnc8ARuVmgc2/2IWla2Ky1o8/pjmyrnIt+d46Clco= -github.com/crypto-org-chain/go-block-stm v0.0.0-20240912024944-1cd89976aa5e h1:FFpE6+Y4o5GxkeGwUcETM6amgohh7msWvWf1MDqueVc= -github.com/crypto-org-chain/go-block-stm v0.0.0-20240912024944-1cd89976aa5e/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE= +github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 h1:OvD5Rm0B6LHUJk6z858UgwdP72jU2DuUdXeclRyKpDI= +github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE= github.com/crypto-org-chain/go-ethereum v1.10.20-0.20240425065928-ebb09502e7a7 h1:V43F3JFcqG4MUThf9W/DytnPblpR6CcaLBw2Wx6zTgE= github.com/crypto-org-chain/go-ethereum v1.10.20-0.20240425065928-ebb09502e7a7/go.mod h1:+a8pUj1tOyJ2RinsNQD4326YS+leSoKGiG/uVVb0x6Y= github.com/danieljoos/wincred v1.2.0 h1:ozqKHaLK0W/ii4KVbbvluM91W2H3Sh0BncbUNPS7jLE= diff --git a/gomod2nix.toml b/gomod2nix.toml index 3b2744bf5a..c9291a08f6 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -216,8 +216,8 @@ schema = 3 version = "v0.0.24" hash = "sha256-4vUukHONOjNn0qfQr4esK6TWfPWsIp+rbdz65og84lw=" [mod."github.com/crypto-org-chain/go-block-stm"] - version = "v0.0.0-20240912024944-1cd89976aa5e" - hash = "sha256-rY8W4dSciOXT29MCySbH5sw0Fp15IQVgBK9QlMX0JeU=" + version = "v0.0.0-20240919080136-6c49aef68716" + hash = "sha256-krCdJz96mpIgmpmRI64W2+W1ycNg+EdLaqeVtAuaW4A=" [mod."github.com/danieljoos/wincred"] version = "v1.2.0" hash = "sha256-LHcvTJCc8++bFndbd8ZgMSTe4L5h2C4rN+cSWHCz54Y=" diff --git a/nix/testground-image.nix b/nix/testground-image.nix index 5644a3f78c..629ae3ae8c 100644 --- a/nix/testground-image.nix +++ b/nix/testground-image.nix @@ -1,4 +1,4 @@ -{ dockerTools, runCommandLocal, cronos-matrix, testground-testcase }: +{ dockerTools, runCommandLocal, cronos-matrix, benchmark-testcase }: let patched-cronosd = cronos-matrix.cronosd.overrideAttrs (oldAttrs: { patches = oldAttrs.patches or [ ] ++ [ @@ -15,13 +15,13 @@ dockerTools.buildLayeredImage { name = "cronos-testground"; created = "now"; contents = [ - testground-testcase + benchmark-testcase patched-cronosd tmpDir ]; config = { Expose = [ 9090 26657 26656 1317 26658 26660 26659 30000 ]; - Cmd = [ "/bin/testground-testcase" ]; + Cmd = [ "/bin/stateless-testcase" ]; Env = [ "PYTHONUNBUFFERED=1" ]; diff --git a/testground/benchmark/benchmark/context.py b/testground/benchmark/benchmark/context.py deleted file mode 100644 index 8d8168a60b..0000000000 --- a/testground/benchmark/benchmark/context.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import socket - -from .params import RunParams, run_params -from .sync import SyncService - -LEADER_SEQUENCE = 1 - - -class Context: - def __init__(self, params: RunParams = None): - if params is None: - params = run_params() - self.params = params - self._sync = None - - @property - def sync(self) -> SyncService: - if self._sync is None: - self._sync = SyncService(self.params) - return self._sync - - def init_common(self): - self.wait_network_ready() - - self.global_seq = self.sync.signal_entry("initialized_global") - self.group_seq = self.sync.signal_and_wait( - f"initialized_group_{self.params.test_group_id}", - self.params.test_group_instance_count, - ) - - print("global_seq:", self.global_seq, "group_seq:", self.group_seq) - - print("start initializing network address") - self.config_network(self.params.network_config(self.global_seq)) - - os.environ["TMPDIR"] = self.params.test_temp_path - - def wait_network_ready(self): - self.record_stage_start("network-initialized") - - if self.params.test_sidecar: - self.sync.barrier("network-initialized", self.params.test_instance_count) - - print("network initialisation successful") - - self.record_stage_end("network-initialized") - - def config_network(self, config: dict): - if not self.params.test_sidecar: - print( - "ignoring network change request; running in a sidecar-less environment" - ) - return - - assert config.get("callback_state"), "no callback state provided" - - return self.sync.publish_and_wait( - "network:" + socket.gethostname(), - config, - config["callback_state"], - self.params.test_instance_count, - ) - - def record_success(self): - return self.sync.signal_event( - { - "success_event": { - "group": self.params.test_group_id, - }, - } - ) - - def record_failure(self, error: str): - return self.sync.signal_event( - { - "failure_event": { - "group": self.params.test_group_id, - "error": error, - }, - } - ) - - def record_stage_start(self, name: str): - self.sync.signal_event( - { - "stage_start_event": { - "name": name, - "group": self.params.test_group_id, - }, - } - ) - - def record_stage_end(self, name: str): - self.sync.signal_event( - { - "stage_end_event": { - "name": name, - "group": self.params.test_group_id, - } - } - ) - - @property - def is_leader(self) -> bool: - return self.global_seq == LEADER_SEQUENCE - - @property - def is_fullnode_leader(self) -> bool: - return not self.is_validator and self.group_seq == LEADER_SEQUENCE - - @property - def is_validator_leader(self) -> bool: - return self.is_validator and self.group_seq == LEADER_SEQUENCE - - @property - def is_validator(self) -> bool: - return self.params.is_validator - - @property - def is_fullnode(self) -> bool: - return not self.params.is_validator - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._sync is not None: - self._sync.close() diff --git a/testground/benchmark/benchmark/main.py b/testground/benchmark/benchmark/main.py deleted file mode 100644 index ba2ba27d01..0000000000 --- a/testground/benchmark/benchmark/main.py +++ /dev/null @@ -1,83 +0,0 @@ -import os -import subprocess -from pathlib import Path - -import web3 - -from .cli import ChainCommand -from .context import Context -from .peer import CONTAINER_CRONOSD_PATH, bootstrap -from .sendtx import generate_load -from .utils import wait_for_block, wait_for_port - - -def influxdb_url(): - return os.environ.get("INFLUXDB_URL", "http://testground-influxdb:8086") - - -def entrypoint(ctx: Context): - ctx.init_common() - - cli = ChainCommand(CONTAINER_CRONOSD_PATH) - - # build the genesis file collectively, and setup the network topology - bootstrap(ctx, cli) - - # start the node - logfile = Path(ctx.params.test_outputs_path) / "node.log" - proc = subprocess.Popen( - [CONTAINER_CRONOSD_PATH, "start"], - stdout=open(logfile, "ab", buffering=0), - ) - - wait_for_port(26657) - wait_for_port(8545) - wait_for_block(cli, 1) - - test_finish_entry = f"finish-test-{ctx.params.test_group_id}" - if not ctx.is_validator: - generate_load(ctx.params.num_accounts, ctx.params.num_txs, ctx.global_seq) - print("finish test", ctx.group_seq) - ctx.sync.signal_and_wait( - test_finish_entry, ctx.params.test_group_instance_count - ) - - if ctx.is_fullnode_leader: - # collect output - w3 = web3.Web3(web3.providers.HTTPProvider("http://localhost:8545")) - for i in range(w3.eth.block_number): - blk = w3.eth.get_block(i) - print(i, len(blk.transactions), blk.timestamp) - - # halt after all tasks are done - ctx.sync.signal_and_wait("halt", ctx.params.test_instance_count) - - proc.kill() - try: - proc.wait(5) - except subprocess.TimeoutExpired: - pass - - ctx.record_success() - - -def info(ctx: Context): - """ - Print the runtime configuration, mainly to check if the image is built successfully. - """ - print(ctx.params) - - -TEST_CASES = { - "entrypoint": entrypoint, - "info": info, -} - - -def main(): - with Context() as ctx: - TEST_CASES[ctx.params.test_case](ctx) - - -if __name__ == "__main__": - main() diff --git a/testground/benchmark/benchmark/network.py b/testground/benchmark/benchmark/network.py deleted file mode 100644 index c0b925c490..0000000000 --- a/testground/benchmark/benchmark/network.py +++ /dev/null @@ -1,26 +0,0 @@ -import ipaddress -from typing import List - -import netifaces - -from .params import RunParams - - -def get_data_ip(params: RunParams) -> ipaddress.IPv4Address: - """ - Get the data network IP address - """ - if not params.test_sidecar: - return "127.0.0.1" - - for addr in ip4_addresses(): - if addr in params.test_subnet: - return addr - - -def ip4_addresses() -> List[ipaddress.IPv4Address]: - ip_list = [] - for interface in netifaces.interfaces(): - for link in netifaces.ifaddresses(interface).get(netifaces.AF_INET, []): - ip_list.append(ipaddress.IPv4Address(link["addr"])) - return ip_list diff --git a/testground/benchmark/benchmark/params.py b/testground/benchmark/benchmark/params.py deleted file mode 100644 index 058d7e72d2..0000000000 --- a/testground/benchmark/benchmark/params.py +++ /dev/null @@ -1,127 +0,0 @@ -import ipaddress -import os -from datetime import datetime -from typing import Dict, Optional - -from pydantic import BaseModel -from pydantic.functional_validators import BeforeValidator -from typing_extensions import Annotated - -VALIDATOR_GROUP_ID = "validators" - - -def parse_dict(s: any) -> Dict[str, str]: - if isinstance(s, str): - return dict(part.split("=") for part in s.split("|")) - return s - - -Params = Annotated[dict, BeforeValidator(parse_dict)] - - -class RunParams(BaseModel): - test_branch: Optional[str] = "" - test_case: Optional[str] = "" - test_group_id: Optional[str] = "" - test_group_instance_count: Optional[int] = 0 - test_instance_count: Optional[int] = 0 - test_instance_params: Optional[Params] = None - test_instance_role: Optional[str] = "" - test_outputs_path: Optional[str] = "" - test_plan: Optional[str] = "" - test_repo: Optional[str] = "" - test_run: Optional[str] = "" - test_sidecar: Optional[bool] = False - test_start_time: Optional[datetime] = None - test_subnet: Optional[ipaddress.IPv4Network] = None - test_tag: Optional[str] = "" - test_capture_profiles: Optional[str] = "" - test_temp_path: Optional[str] = "" - log_level: Optional[str] = "" - - def events_key(self): - return ( - f"run:{self.test_run}" - f":plan:{self.test_plan}" - f":case:{self.test_case}" - ":run_events" - ) - - def topic_key(self, topic: str): - return ( - f"run:{self.test_run}" - f":plan:{self.test_plan}" - f":case:{self.test_case}" - f":topics:{topic}" - ) - - def state_key(self, name: str): - return ( - f"run:{self.test_run}" - f":plan:{self.test_plan}" - f":case:{self.test_case}" - f":states:{name}" - ) - - def ipaddress(self, seq: int) -> ipaddress.IPv4Address: - # add 256 to avoid conflict with system services - return self.test_subnet.network_address + (seq + 256) - - def network_config( - self, global_seq: int, callback_state="network-configured" - ) -> dict: - """ - config ip address based on global seq - """ - return { - "network": "default", - "enable": True, - # using the assigned `GlobalSequencer` id per each of instance - # to fill in the last 2 octets of the new IP address for the instance - "IPv4": str(self.ipaddress(global_seq)) + "/16", - "IPv6": None, - "rules": None, - "default": { - "latency": 0, - "jitter": 0, - "bandwidth": 0, - "filter": 0, - "loss": 0, - "corrupt": 0, - "corrupt_corr": 0, - "reorder": 0, - "reorder_corr": 0, - "duplicate": 0, - "duplicate_corr": 0, - }, - "callback_state": callback_state, - "routing_policy": "allow_all", - } - - @property - def is_validator(self) -> bool: - return self.test_group_id == VALIDATOR_GROUP_ID - - @property - def chain_id(self) -> str: - return self.test_instance_params["chain_id"] - - @property - def num_accounts(self) -> int: - return int(self.test_instance_params["num_accounts"]) - - @property - def num_txs(self) -> int: - return int(self.test_instance_params["num_txs"]) - - -def run_params(env=None) -> RunParams: - if env is None: - env = os.environ - - d = { - name: env[name.upper()] - for name in RunParams.model_fields - if name.upper() in env - } - return RunParams(**d) diff --git a/testground/benchmark/benchmark/peer.py b/testground/benchmark/benchmark/peer.py index f8b043b758..deeb860ffe 100644 --- a/testground/benchmark/benchmark/peer.py +++ b/testground/benchmark/benchmark/peer.py @@ -4,12 +4,10 @@ from pathlib import Path from typing import List +import jsonmerge from pydantic.json import pydantic_encoder from .cli import ChainCommand -from .context import Context -from .network import get_data_ip -from .topology import connect_all from .types import Balance, GenesisAccount, PeerPacket from .utils import eth_to_bech32, gen_account, patch_json, patch_toml @@ -24,37 +22,6 @@ CONTAINER_CRONOSD_PATH = "/bin/cronosd" -def bootstrap(ctx: Context, cli) -> PeerPacket: - home = Path.home() / ".cronos" - peer = init_node( - cli, - home, - get_data_ip(ctx.params), - ctx.params.chain_id, - ctx.params.test_group_id, - ctx.group_seq, - ctx.global_seq, - ) - - data = ctx.sync.publish_subscribe_simple( - "peers", peer.dict(), ctx.params.test_instance_count - ) - peers: List[PeerPacket] = [PeerPacket.model_validate(item) for item in data] - - if ctx.is_fullnode_leader: - # prepare genesis file and publish - genesis = gen_genesis(cli, home, peers, {}) - ctx.sync.publish("genesis", genesis) - else: - genesis = ctx.sync.subscribe_simple("genesis", 1)[0] - (home / "config" / "genesis.json").write_text(json.dumps(genesis)) - cli("genesis", "validate", home=home) - - p2p_peers = connect_all(peer, peers) - patch_configs(home, p2p_peers, {}, {}) - return peer - - def init_node( cli: ChainCommand, home: Path, @@ -133,41 +100,54 @@ def gen_genesis( print("genesis validated") return patch_json( leader_home / "config" / "genesis.json", - { - "consensus.params.block.max_gas": "163000000", - "app_state.evm.params.evm_denom": "basecro", - "app_state.feemarket.params.no_base_fee": True, - **genesis_patch, - }, + jsonmerge.merge( + { + "consensus": {"params": {"block": {"max_gas": "163000000"}}}, + "app_state": { + "evm": {"params": {"evm_denom": "basecro"}}, + "feemarket": {"params": {"no_base_fee": True}}, + }, + }, + genesis_patch, + ), ) def patch_configs(home: Path, peers: str, config_patch: dict, app_patch: dict): default_config_patch = { "db_backend": "rocksdb", - "p2p.addr_book_strict": False, - "mempool.recheck": False, - "mempool.size": MEMPOOL_SIZE, - "consensus.timeout_commit": "1s", - "tx_index.indexer": "null", + "p2p": {"addr_book_strict": False}, + "mempool": { + "recheck": False, + "size": MEMPOOL_SIZE, + }, + "consensus": {"timeout_commit": "1s"}, + "tx_index": {"indexer": "null"}, } default_app_patch = { "minimum-gas-prices": "0basecro", "index-events": ["ethereum_tx.ethereumTxHash"], - "memiavl.enable": True, - "mempool.max-txs": MEMPOOL_SIZE, - "evm.block-executor": "block-stm", # or "sequential" - "evm.block-stm-workers": 0, - "evm.block-stm-pre-estimate": True, - "json-rpc.enable-indexer": True, + "memiavl": { + "enable": True, + "cache-size": 0, + }, + "mempool": {"max-txs": MEMPOOL_SIZE}, + "evm": { + "block-executor": "block-stm", # or "sequential" + "block-stm-workers": 0, + "block-stm-pre-estimate": True, + }, + "json-rpc": {"enable-indexer": True}, } # update persistent_peers and other configs in config.toml - config_patch = { - **default_config_patch, - **config_patch, - "p2p.persistent_peers": peers, - } - app_patch = {**default_app_patch, **app_patch} + config_patch = jsonmerge.merge( + default_config_patch, + jsonmerge.merge( + config_patch, + {"p2p": {"persistent_peers": peers}}, + ), + ) + app_patch = jsonmerge.merge(default_app_patch, app_patch) patch_toml(home / "config" / "config.toml", config_patch) patch_toml(home / "config" / "app.toml", app_patch) diff --git a/testground/benchmark/benchmark/sendtx.py b/testground/benchmark/benchmark/sendtx.py index 51ed6fd7d8..24b11f8ba4 100644 --- a/testground/benchmark/benchmark/sendtx.py +++ b/testground/benchmark/benchmark/sendtx.py @@ -1,14 +1,11 @@ import asyncio -import time -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor from pathlib import Path import aiohttp import ujson -import web3 -from eth_account import Account -from .utils import gen_account, send_transaction +from .utils import gen_account GAS_PRICE = 1000000000 CHAIN_ID = 777 @@ -28,58 +25,6 @@ def test_tx(nonce: int): } -def sendtx(w3: web3.Web3, acct: Account, tx_amount: int): - initial_nonce = w3.eth.get_transaction_count(acct.address) - print( - "test begin, address:", - acct.address, - "balance:", - w3.eth.get_balance(acct.address), - "nonce:", - initial_nonce, - ) - - nonce = initial_nonce - while nonce < initial_nonce + tx_amount: - try: - send_transaction(w3, test_tx(nonce), acct, wait=False) - except ValueError as e: - msg = str(e) - if "invalid nonce" in msg: - print("invalid nonce and retry", nonce) - time.sleep(1) - continue - if "tx already in mempool" not in msg: - raise - - nonce += 1 - - if nonce % 100 == 0: - print(f"{acct.address} sent {nonce} transactions") - - print( - "test end, address:", - acct.address, - "balance:", - w3.eth.get_balance(acct.address), - "nonce:", - w3.eth.get_transaction_count(acct.address), - ) - - -def generate_load(num_accounts, num_txs, global_seq): - w3 = web3.Web3(web3.providers.HTTPProvider(LOCAL_JSON_RPC)) - assert w3.eth.chain_id == CHAIN_ID - accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)] - with ThreadPoolExecutor(max_workers=num_accounts) as executor: - futs = (executor.submit(sendtx, w3, acct, num_txs) for acct in accounts) - for fut in as_completed(futs): - try: - fut.result() - except Exception as e: - print("test task failed", e) - - def async_write_tx(account, tx, file_path): raw_tx = account.sign_transaction(tx).rawTransaction with open(file_path, "w") as file: diff --git a/testground/benchmark/benchmark/stateless.py b/testground/benchmark/benchmark/stateless.py index f12ba6016e..e3563f393a 100644 --- a/testground/benchmark/benchmark/stateless.py +++ b/testground/benchmark/benchmark/stateless.py @@ -91,14 +91,6 @@ def _gen( (outdir / VALIDATOR_GROUP).mkdir(parents=True, exist_ok=True) (outdir / FULLNODE_GROUP).mkdir(parents=True, exist_ok=True) - config_patch = ( - json.loads(config_patch) if isinstance(config_patch, str) else config_patch - ) - app_patch = json.loads(app_patch) if isinstance(app_patch, str) else app_patch - genesis_patch = ( - json.loads(genesis_patch) if isinstance(genesis_patch, str) else genesis_patch - ) - peers = [] for i in range(validators): print("init validator", i) diff --git a/testground/benchmark/benchmark/sync.py b/testground/benchmark/benchmark/sync.py deleted file mode 100644 index ec1f380caa..0000000000 --- a/testground/benchmark/benchmark/sync.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -client for sync service -""" - -import json -import os -import queue -import threading - -import websocket - -from .params import RunParams - - -class SyncService: - def __init__(self, params: RunParams, url=None): - if not url: - url = sync_service_url() - self._id = 0 - self._handlers = {} - self._quit = False - self.ws = websocket.create_connection(url) - self._params = params - - self._recv_thread = threading.Thread(target=self.recv_loop) - self._recv_thread.start() - - def recv_loop(self): - while not self._quit: - rsp = self.ws.recv() - if not rsp: - break - self.on_message(rsp) - - handlers = self._handlers - self._handlers = None - - for handler in handlers.values(): - handler(None) - - def on_message(self, msg): - msg = json.loads(msg) - id = int(msg["id"]) - try: - callback = self._handlers[id] - except KeyError: - return - callback(msg) - - def close(self): - self._quit = True - self.ws.close() - self._recv_thread.join() - - def next_id(self): - self._id += 1 - return self._id - - def _request(self, name, payload): - "send request, wait for response" - id = self.next_id() - cond = Condition() - self._handlers[id] = cond.notify - - self._send( - { - "id": str(id), - name: payload, - } - ) - - rsp = cond.wait() - if rsp is None: - # connection closed - return - del self._handlers[id] - assert not rsp.get("error"), rsp - return rsp - - def _publish(self, topic, payload=None): - rsp = self._request( - "publish", - { - "topic": topic, - "payload": payload, - }, - ) - return rsp["publish"]["seq"] - - def _subscribe(self, topic, callback): - "send one request, recv multiple responses" - - def on_recv(msg): - callback(json.loads(msg["subscribe"]) if msg is not None else None) - - id = self.next_id() - self._handlers[id] = on_recv - self._send( - { - "id": str(id), - "subscribe": { - "topic": topic, - }, - } - ) - - def signal_event(self, event): - return self._publish(self._params.events_key(), event) - - def publish(self, topic, payload=None): - return self._publish(self._params.topic_key(topic), payload) - - def barrier(self, state, target): - self._request( - "barrier", - { - "state": self._params.state_key(state), - "target": target, - }, - ) - - def signal_entry(self, state): - rsp = self._request( - "signal_entry", - { - "state": self._params.state_key(state), - }, - ) - return rsp["signal_entry"]["seq"] - - def subscribe(self, topic, callback): - return self._subscribe(self._params.topic_key(topic), callback) - - def subscribe_simple(self, topic, n): - """ - subscribe_simple wait for exactly n messages on the topic and return them - """ - q = queue.Queue() - self.subscribe(topic, q.put) - return [q.get() for _ in range(n)] - - def _send(self, payload): - self.ws.send(json.dumps(payload)) - - def publish_and_wait(self, topic, payload, state, target): - """ - composes Publish and a Barrier. It first publishes the - provided payload to the specified topic, then awaits for a barrier on the - supplied state to reach the indicated target. - - If any operation fails, PublishAndWait short-circuits and returns a non-nil - error and a negative sequence. If Publish succeeds, but the Barrier fails, - the seq number will be greater than zero. - """ - seq = self.publish(topic, payload) - self.barrier(state, target) - return seq - - def publish_subscribe(self, topic, payload, callback): - """ - publish_subscribe publishes the payload on the supplied Topic, then subscribes - to it, sending paylods to the supplied channel. - """ - seq = self.publish(topic, payload) - self.subscribe(topic, callback) - return seq - - def publish_subscribe_simple(self, topic, payload, n): - self.publish(topic, payload) - return self.subscribe_simple(topic, n) - - def signal_and_wait(self, state, target): - seq = self.signal_entry(state) - self.barrier(state, target) - return seq - - -def sync_service_url(): - host = os.environ.get("SYNC_SERVICE_HOST", "testground-sync-service") - port = os.environ.get("SYNC_SERVICE_PORT", "5050") - return f"ws://{host}:{port}" - - -class Condition: - """ - Condition with payload - """ - - def __init__(self): - self._cond = threading.Condition() - self._value = None - - def wait(self): - with self._cond: - if self._value is not None: - return self._value - self._cond.wait() - return self._value - - def notify(self, value): - with self._cond: - self._value = value - self._cond.notify() diff --git a/testground/benchmark/benchmark/test_params.py b/testground/benchmark/benchmark/test_params.py deleted file mode 100644 index ed556b8b58..0000000000 --- a/testground/benchmark/benchmark/test_params.py +++ /dev/null @@ -1,57 +0,0 @@ -import ipaddress -from datetime import datetime - -import pytest -from pydantic import ValidationError - -from .params import RunParams, parse_dict, run_params - - -def test_params(): - exp = RunParams( - test_case="entrypoint", - test_group_id="single", - test_group_instance_count=2, - test_instance_count=2, - test_instance_params=parse_dict( - "latency=0|timeout=21m|bandwidth=420Mib|chain_id=testground" - ), - test_outputs_path="/outputs", - test_plan="benchmark", - test_run="cp9va5nae0pksdti05vg", - test_start_time=datetime.fromisoformat("2024-05-27T10:52:08+08:00"), - test_subnet=ipaddress.IPv4Network("16.20.0.0/16"), - test_sidecar=True, - test_temp_path="/temp", - ) - actual = run_params( - { - "TEST_BRANCH": "", - "TEST_CASE": "entrypoint", - "TEST_GROUP_ID": "single", - "TEST_GROUP_INSTANCE_COUNT": "2", - "TEST_INSTANCE_COUNT": "2", - "TEST_INSTANCE_PARAMS": ( - "latency=0|timeout=21m|bandwidth=420Mib|chain_id=testground" - ), - "TEST_INSTANCE_ROLE": "", - "TEST_OUTPUTS_PATH": "/outputs", - "TEST_PLAN": "benchmark", - "TEST_REPO": "", - "TEST_RUN": "cp9va5nae0pksdti05vg", - "TEST_START_TIME": "2024-05-27T10:52:08+08:00", - "TEST_SIDECAR": "true", - "TEST_SUBNET": "16.20.0.0/16", - "TEST_TAG": "", - "TEST_CAPTURE_PROFILES": "", - "TEST_TEMP_PATH": "/temp", - } - ) - assert exp == actual - - -def test_params_bool(): - assert RunParams(test_sidecar=True) == run_params({"TEST_SIDECAR": "true"}) - assert RunParams(test_sidecar=False) == run_params({"TEST_SIDECAR": "false"}) - with pytest.raises(ValidationError): - run_params({"TEST_SIDECAR": "fse"}) diff --git a/testground/benchmark/benchmark/test_sync.py b/testground/benchmark/benchmark/test_sync.py deleted file mode 100644 index 4cac6da17f..0000000000 --- a/testground/benchmark/benchmark/test_sync.py +++ /dev/null @@ -1,48 +0,0 @@ -import concurrent.futures -import ipaddress -from datetime import datetime - -from .params import RunParams -from .sync import SyncService - -SYNC_SERVICE_URL = "ws://localhost:5050" -TEST_PARAMS = RunParams( - test_case="entrypoint", - test_group_id="single", - test_group_instance_count=2, - test_instance_count=2, - test_instance_params=("latency=0|timeout=21m|bandwidth=420Mib|chain_id=testground"), - test_outputs_path="/outputs", - test_plan="benchmark", - test_run="cp9va5nae0pksdti05vg", - test_start_time=datetime.fromisoformat("2024-05-27T10:52:08+08:00"), - test_subnet=ipaddress.IPv4Network("16.20.0.0/16"), - test_sidecar=True, - test_temp_path="/temp", -) - - -def test_barrier(): - sync = SyncService(TEST_PARAMS, SYNC_SERVICE_URL) - state = "test1" - target = 10 - for i in range(target): - sync.signal_entry(state) - sync.barrier(state, target) - sync.close() - - -def test_signal_and_wait(): - sync = SyncService(TEST_PARAMS, SYNC_SERVICE_URL) - state = "test_signal_and_wait" - target = 10 - - def do_test(): - sync.signal_and_wait(state, target) - - with concurrent.futures.ThreadPoolExecutor(max_workers=target) as executor: - futs = [executor.submit(do_test) for i in range(target)] - for fut in futs: - fut.result(1) - - sync.close() diff --git a/testground/benchmark/benchmark/utils.py b/testground/benchmark/benchmark/utils.py index 172a956887..6cbab687a6 100644 --- a/testground/benchmark/benchmark/utils.py +++ b/testground/benchmark/benchmark/utils.py @@ -4,6 +4,7 @@ from pathlib import Path import bech32 +import jsonmerge import requests import tomlkit import web3 @@ -15,26 +16,24 @@ LOCAL_RPC = "http://localhost:26657" -def patch_dict(doc, kwargs): - for k, v in kwargs.items(): - keys = k.split(".") - assert len(keys) > 0 - cur = doc - for section in keys[:-1]: - cur = cur[section] - cur[keys[-1]] = v +def patch_toml_doc(doc, patch): + for k, v in patch.items(): + if isinstance(v, dict): + patch_toml_doc(doc.setdefault(k, {}), v) + else: + doc[k] = v -def patch_toml(path: Path, kwargs): +def patch_toml(path: Path, patch): doc = tomlkit.parse(path.read_text()) - patch_dict(doc, kwargs) + patch_toml_doc(doc, patch) path.write_text(tomlkit.dumps(doc)) return doc -def patch_json(path: Path, kwargs): +def patch_json(path: Path, patch): doc = json.loads(path.read_text()) - patch_dict(doc, kwargs) + doc = jsonmerge.merge(doc, patch) path.write_text(json.dumps(doc)) return doc diff --git a/testground/benchmark/flake.nix b/testground/benchmark/flake.nix index 3883096eed..f71ff46a46 100644 --- a/testground/benchmark/flake.nix +++ b/testground/benchmark/flake.nix @@ -63,19 +63,19 @@ }; in rec { - packages.default = pkgs.testground-testcase; + packages.default = pkgs.benchmark-testcase; apps = { default = { type = "app"; - program = "${pkgs.testground-testcase}/bin/testground-testcase"; + program = "${pkgs.benchmark-testcase}/bin/stateless-testcase"; }; stateless-testcase = { type = "app"; - program = "${pkgs.testground-testcase}/bin/stateless-testcase"; + program = "${pkgs.benchmark-testcase}/bin/stateless-testcase"; }; }; devShells.default = pkgs.mkShell { - buildInputs = [ pkgs.testground-testcase-env ]; + buildInputs = [ pkgs.benchmark-testcase-env ]; }; legacyPackages = pkgs; }) diff --git a/testground/benchmark/overlay.nix b/testground/benchmark/overlay.nix index bb37c0a1ef..328d6ce9f0 100644 --- a/testground/benchmark/overlay.nix +++ b/testground/benchmark/overlay.nix @@ -40,6 +40,6 @@ let in { - testground-testcase = final.callPackage benchmark { }; - testground-testcase-env = final.callPackage benchmark-env { }; + benchmark-testcase = final.callPackage benchmark { }; + benchmark-testcase-env = final.callPackage benchmark-env { }; } diff --git a/testground/benchmark/poetry.lock b/testground/benchmark/poetry.lock index d0e4207cb0..97de198f91 100644 --- a/testground/benchmark/poetry.lock +++ b/testground/benchmark/poetry.lock @@ -965,6 +965,20 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "jsonmerge" +version = "1.9.2" +description = "Merge a series of JSON documents." +optional = false +python-versions = "*" +files = [ + {file = "jsonmerge-1.9.2-py3-none-any.whl", hash = "sha256:cab93ee7763fb51a4a09bbdab2eacf499ffb208ab94247ae86acea3e0e823b05"}, + {file = "jsonmerge-1.9.2.tar.gz", hash = "sha256:c43757e0180b0e19b7ae4c130ad42a07cc580c31912f61f4823e8eaf2fa394a3"}, +] + +[package.dependencies] +jsonschema = ">2.4.0" + [[package]] name = "jsonschema" version = "4.22.0" @@ -2188,4 +2202,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "f95059b65ce788a6cef6d06ec8d9dd9e65a819502003b2c3fa269647c0bc299d" +content-hash = "0fb7ea7cb80f99d281286bd73bc04631c191c6b75b7375fa98cc9e590ff65074" diff --git a/testground/benchmark/pyproject.toml b/testground/benchmark/pyproject.toml index e626b7f291..132cfc07f3 100644 --- a/testground/benchmark/pyproject.toml +++ b/testground/benchmark/pyproject.toml @@ -17,6 +17,7 @@ bech32 = "^1" requests = "^2.32" click = "^8.1.7" ujson = "^5.10.0" +jsonmerge = "^1.9.2" [tool.poetry.dev-dependencies] pytest = "^8.2" @@ -26,7 +27,6 @@ requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] -testground-testcase = "benchmark.main:main" stateless-testcase = "benchmark.stateless:cli" [tool.black]