Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: no cluster benchmarking #1431

Merged
merged 23 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go.work
go.work.sum

# nix
/result
result

# pystarport
/data
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* (e2ee)[#1415](https://github.com/crypto-org-chain/cronos/pull/1415) Add batch keys query for e2ee module.
* (e2ee)[#1421](https://github.com/crypto-org-chain/cronos/pull/1421) Validate e2ee key when register.
* (store) [#1448](https://github.com/crypto-org-chain/cronos/pull/1448) Upgrade rocksdb to `v9.1.1`.
* [#1431](https://github.com/crypto-org-chain/cronos/pull/1431) Integrate testground to run benchmark on cluster.

### Bug Fixes

Expand Down
62 changes: 62 additions & 0 deletions testground/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Guide

[Testground documentation](https://docs.testground.ai/)

## Getting started

### Prerequisites

- docker
- go 1.22, or higher

### Install Testground

```bash
$ git clone https://github.com/testground/testground.git
$ cd testground
# compile Testground and all related dependencies
$ make install
```

It'll install the `testground` binary in your `$GOPATH/bin` directory, and build several docker images.

### Running Testground

```bash
$ TESTGROUND_HOME=$PWD/data testground daemon
```

Keep the daemon process running during the test.

### Running Test Plan

Import the test plan before the first run:

```bash
$ TESTGROUND_HOME=$PWD/data testground plan import --from /path/to/cronos/testground/benchmark
```

Run the benchmark test plan in local docker environment:

```bash
$ testground run composition -f /path/to/cronos/testground/benchmark/compositions/local.toml --wait
```

### macOS

If you use `colima` as docker runtime on macOS, create the symbolic link `/var/run/docker.sock`:

```bash
$ sudo ln -s $HOME/.colima/docker.sock /var/run/docker.sock
```

And mount the related directories into the virtual machine:

```toml
mounts:
- location: /var/folders
writable: false
- location: <TESTGROUND_HOME>
writable: true
```

Empty file.
52 changes: 52 additions & 0 deletions testground/benchmark/benchmark/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
import subprocess


class ChainCommand:
def __init__(self, cmd):
self.cmd = cmd

def raw(self, *args, stdin=None, stderr=subprocess.STDOUT, **kwargs):
"execute the command"
args = " ".join(build_cli_args_safe(*args, **kwargs))
return interact(
f"{self.cmd} {args}", input=stdin, stderr=stderr, env=os.environ
)

def __call__(self, *args, **kwargs):
"execute the command and clean the output"
return self.raw(*args, **kwargs).decode().strip()


def interact(cmd, ignore_error=False, input=None, **kwargs):
kwargs.setdefault("stderr", subprocess.STDOUT)
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True,
**kwargs,
)
# begin = time.perf_counter()
(stdout, _) = proc.communicate(input=input)
# print('[%.02f] %s' % (time.perf_counter() - begin, cmd))
if not ignore_error:
assert proc.returncode == 0, f'{stdout.decode("utf-8")} ({cmd})'
return stdout
mmsqe marked this conversation as resolved.
Show resolved Hide resolved


def build_cli_args_safe(*args, **kwargs):
args = [safe_cli_string(arg) for arg in args if arg]
for k, v in kwargs.items():
if v is None:
continue
args.append("--" + k.strip("_").replace("_", "-"))
args.append(safe_cli_string(v))
return list(map(str, args))


def safe_cli_string(s):
'wrap string in "", used for cli argument when contains spaces'
if len(f"{s}".split()) > 1:
return f"'{s}'"
return f"{s}"
110 changes: 110 additions & 0 deletions testground/benchmark/benchmark/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import socket

from .params import RunParams, run_params
from .sync import SyncService

LEADER_GLOBAL_SEQUENCE = 1


class Context:
def __init__(self, params: RunParams = None):
if params is None:
params = run_params()
self.params = params
self.sync = SyncService(params)

def init_common(self):
self.wait_network_ready()

self.global_seq = self.sync.signal_entry("initialized_global")
self.group_seq = self.sync.signal_and_wait(
f"initialized_group_{self.params.test_group_id}",
self.params.test_group_instance_count,
)

print("global_seq:", self.global_seq, "group_seq:", self.group_seq)

print("start initializing network address")
self.config_network(self.params.network_config(self.global_seq))

os.environ["TMPDIR"] = self.params.test_temp_path

def wait_network_ready(self):
self.record_stage_start("network-initialized")

if self.params.test_sidecar:
self.sync.barrier("network-initialized", self.params.test_instance_count)

print("network initialisation successful")

self.record_stage_end("network-initialized")

def config_network(self, config: dict):
if not self.params.test_sidecar:
print(
"ignoring network change request; running in a sidecar-less environment"
)
return

assert config.get("callback_state"), "no callback state provided"

return self.sync.publish_and_wait(
"network:" + socket.gethostname(),
config,
config["callback_state"],
self.params.test_instance_count,
)

def record_success(self):
return self.sync.signal_event(
{
"success_event": {
"group": self.params.test_group_id,
},
}
)

def record_failure(self, error: str):
return self.sync.signal_event(
{
"failure_event": {
"group": self.params.test_group_id,
"error": error,
},
}
)

def record_stage_start(self, name: str):
self.sync.signal_event(
{
"stage_start_event": {
"name": name,
"group": self.params.test_group_id,
},
}
)

def record_stage_end(self, name: str):
self.sync.signal_event(
{
"stage_end_event": {
"name": name,
"group": self.params.test_group_id,
}
}
)

@property
def is_leader(self) -> bool:
return self.global_seq == LEADER_GLOBAL_SEQUENCE

@property
def is_validator(self) -> bool:
return self.params.is_validator

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.sync.close()
60 changes: 60 additions & 0 deletions testground/benchmark/benchmark/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import subprocess

from .cli import ChainCommand
from .context import Context
from .peer import bootstrap
from .sendtx import sendtx
from .utils import wait_for_block, wait_for_port

CRONOSD_PATH = "/bin/cronosd"


def influxdb_url():
return os.environ.get("INFLUXDB_URL", "http://testground-influxdb:8086")


def entrypoint(ctx: Context):
ctx.init_common()

cli = ChainCommand(CRONOSD_PATH)

# build the genesis file collectively, and setup the network topology
peer = bootstrap(ctx, cli)

# start the node
kwargs = {"stdout": subprocess.DEVNULL}
if ctx.is_leader:
del kwargs["stdout"]
proc = subprocess.Popen([CRONOSD_PATH, "start"], **kwargs)

wait_for_port(26657)
wait_for_port(8545)
wait_for_block(cli, 1)

if not ctx.is_validator:
sendtx(cli, peer)

# halt after all tasks are done
ctx.sync.signal_and_wait("halt", ctx.params.test_instance_count)

proc.kill()
try:
proc.wait(5)
except subprocess.TimeoutExpired:
pass
ctx.record_success()


TEST_CASES = {
"entrypoint": entrypoint,
}


def main():
with Context() as ctx:
TEST_CASES[ctx.params.test_case](ctx)


if __name__ == "__main__":
main()
26 changes: 26 additions & 0 deletions testground/benchmark/benchmark/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import ipaddress
from typing import List

import netifaces

from .params import RunParams


def get_data_ip(params: RunParams) -> ipaddress.IPv4Address:
"""
Get the data network IP address
"""
if not params.test_sidecar:
return "127.0.0.1"

for addr in ip4_addresses():
if addr in params.test_subnet:
return addr


def ip4_addresses() -> List[ipaddress.IPv4Address]:
ip_list = []
for interface in netifaces.interfaces():
for link in netifaces.ifaddresses(interface)[netifaces.AF_INET]:
ip_list.append(ipaddress.IPv4Address(link["addr"]))
return ip_list
Loading
Loading