Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retire DReps during testnet cleanup #2787

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cardano_node_tests/testnet_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

* withdraw rewards
* deregister stake addresses
* retire DReps
* return funds to faucet
"""

Expand Down Expand Up @@ -33,13 +34,11 @@ def get_args() -> argparse.Namespace:
parser.add_argument(
"-f",
"--address",
required=True,
help="Faucet address",
)
parser.add_argument(
"-s",
"--skey-file",
required=True,
type=helpers.check_file_arg,
help="Path to faucet skey file",
)
Expand All @@ -57,8 +56,15 @@ def main() -> int:
if not socket_env:
LOGGER.error("The `CARDANO_NODE_SOCKET_PATH` environment variable is not set.")
return 1
if not os.environ.get("BOOTSTRAP_DIR"):
LOGGER.error("The `BOOTSTRAP_DIR` environment variable is not set.")
if bool(args.address) ^ bool(args.skey_file):
LOGGER.error(
"Both address and skey file must be provided, or neither of them should be provided."
)
return 1
if not (args.address or os.environ.get("BOOTSTRAP_DIR")):
LOGGER.error(
"The address must be provided, or `BOOTSTRAP_DIR` environment variable must be set."
)
return 1

state_dir = pl.Path(socket_env).parent
Expand Down
199 changes: 154 additions & 45 deletions cardano_node_tests/utils/testnet_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

* withdraw rewards
* deregister stake addresses
* retire DReps
* return funds to faucet
"""

import concurrent.futures
import functools
import itertools
import logging
import pathlib as pl
import queue
import random
import time
import typing as tp
Expand Down Expand Up @@ -78,19 +81,51 @@ def deregister_stake_addr(
LOGGER.info(f"Deregistered stake address '{pool_user.stake.address}'")


def retire_drep(
cluster_obj: clusterlib.ClusterLib,
payment_addr: clusterlib.AddressRecord,
drep_keys: clusterlib.KeyPair,
name_template: str,
deposit_amt: int,
) -> None:
"""Retire a DRep."""
ret_cert = cluster_obj.g_conway_governance.drep.gen_retirement_cert(
cert_name=f"{name_template}_cleanup",
deposit_amt=deposit_amt,
drep_vkey_file=drep_keys.vkey_file,
)
tx_files = clusterlib.TxFiles(
certificate_files=[ret_cert],
signing_key_files=[payment_addr.skey_file, drep_keys.skey_file],
)

try:
cluster_obj.g_transaction.send_tx(
src_address=payment_addr.address,
tx_name=f"{name_template}_retire_drep",
tx_files=tx_files,
deposit=-deposit_amt,
)
except clusterlib.CLIError:
LOGGER.error(f"Failed to retire a DRep '{name_template}'") # noqa: TRY400
else:
LOGGER.info(f"Retired a DRep '{name_template}'")


def return_funds_to_faucet(
cluster_obj: clusterlib.ClusterLib,
src_addr: clusterlib.AddressRecord,
src_addrs: tp.List[clusterlib.AddressRecord],
faucet_address: str,
tx_name: str,
) -> None:
"""Send funds from `src_addr` to `faucet_address`."""
tx_name = f"rf_{tx_name}_return_funds"
"""Send funds from `src_addr`s to `faucet_address`."""
tx_name = f"rf_{tx_name}"
# the amount of "-1" means all available funds.
fund_dst = [clusterlib.TxOut(address=faucet_address, amount=-1)]
fund_tx_files = clusterlib.TxFiles(signing_key_files=[src_addr.skey_file])
fund_tx_files = clusterlib.TxFiles(signing_key_files=[f.skey_file for f in src_addrs])

txins = cluster_obj.g_query.get_utxo(address=src_addr.address, coins=[clusterlib.DEFAULT_COIN])
txins_nested = [cluster_obj.g_query.get_utxo(address=f.address) for f in src_addrs]
txins = list(itertools.chain.from_iterable(txins_nested))
utxos_balance = functools.reduce(lambda x, y: x + y.amount, txins, 0)

# skip if there no (or too little) Lovelace
Expand All @@ -109,17 +144,17 @@ def return_funds_to_faucet(
# try to return funds; don't mind if there's not enough funds for fees etc.
try:
cluster_obj.g_transaction.send_tx(
src_address=src_addr.address,
src_address=src_addrs[0].address,
tx_name=tx_name,
txins=txins,
txouts=fund_dst,
tx_files=fund_tx_files,
verify_tx=False,
)
except clusterlib.CLIError:
LOGGER.error(f"Failed to return funds from '{src_addr.address}'") # noqa: TRY400
LOGGER.error(f"Failed to return funds from addresses for '{tx_name}'") # noqa: TRY400
else:
LOGGER.info(f"Returned funds from '{src_addr.address}'")
LOGGER.info(f"Returned funds from addresses '{tx_name}'")


def create_addr_record(addr_file: pl.Path) -> clusterlib.AddressRecord:
Expand All @@ -141,13 +176,17 @@ def create_addr_record(addr_file: pl.Path) -> clusterlib.AddressRecord:
return addr_record


def find_files(location: clusterlib.FileType) -> tp.Generator[pl.Path, None, None]:
def find_addr_files(location: pl.Path) -> tp.Generator[pl.Path, None, None]:
r"""Find all '\*.addr' files in given location and it's subdirectories."""
location = pl.Path(location).expanduser().resolve()
return location.glob("**/*.addr")


def group_files(file_paths: tp.Generator[pl.Path, None, None]) -> tp.List[tp.List[pl.Path]]:
def find_cert_files(location: pl.Path) -> tp.Generator[pl.Path, None, None]:
r"""Find all '\*_drep_reg.cert' files in given location and it's subdirectories."""
return location.glob("**/*_drep_reg.cert")


def group_addr_files(file_paths: tp.Generator[pl.Path, None, None]) -> tp.List[tp.List[pl.Path]]:
"""Group payment address files with corresponding stake address files.

These need to be processed together - funds are transferred from payment address after
Expand All @@ -172,43 +211,16 @@ def group_files(file_paths: tp.Generator[pl.Path, None, None]) -> tp.List[tp.Lis
return path_groups


def _get_faucet_payment_rec(
address: str = "",
skey_file: clusterlib.FileType = "",
) -> clusterlib.AddressRecord:
if address or skey_file:
if not (address and skey_file):
err = "Both 'address' and 'skey_file' need to be set."
raise ValueError(err)

faucet_payment = clusterlib.AddressRecord(
address=address,
vkey_file=pl.Path("/nonexistent"), # We don't need this for faucet
skey_file=pl.Path(skey_file),
)
else:
# Try to infer the faucet address and keys from cluster env
cluster_env = cluster_nodes.get_cluster_env()
faucet_addr_file = cluster_env.state_dir / "shelley" / "faucet.addr"
faucet_payment = create_addr_record(faucet_addr_file)

return faucet_payment


def cleanup(
cluster_obj: clusterlib.ClusterLib,
location: clusterlib.FileType,
faucet_address: str = "",
faucet_skey_file: clusterlib.FileType = "",
def cleanup_addresses(
cluster_obj: clusterlib.ClusterLib, location: pl.Path, faucet_payment: clusterlib.AddressRecord
) -> None:
"""Cleanup a testnet with the help of testing artifacts."""
faucet_payment = _get_faucet_payment_rec(address=faucet_address, skey_file=faucet_skey_file)
files_found = group_files(find_files(location))
"""Cleanup addresses."""
files_found = group_addr_files(find_addr_files(location))
stake_deposit_amt = cluster_obj.g_query.get_address_deposit()

def _run(files: tp.List[pl.Path]) -> None:
for fpath in files:
# add random sleep for < 1s to prevent
# Add random sleep for < 1s to prevent
# "Network.Socket.connect: <socket: 11>: resource exhausted"
time.sleep(random.random())

Expand Down Expand Up @@ -252,16 +264,113 @@ def _run(files: tp.List[pl.Path]) -> None:
continue
return_funds_to_faucet(
cluster_obj=cluster_obj,
src_addr=payment,
src_addrs=[payment],
faucet_address=faucet_payment.address,
tx_name=f_name,
)

# run cleanup in parallel
# Run cleanup in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(_run, f) for f in files_found]
concurrent.futures.wait(futures)


def cleanup_certs(
cluster_obj: clusterlib.ClusterLib, location: pl.Path, faucet_payment: clusterlib.AddressRecord
) -> None:
"""Cleanup DRep certs."""
files_found = find_cert_files(location)
drep_deposit_amt = cluster_obj.conway_genesis["dRepDeposit"]

# Fund the addresses that will pay for fees
fund_addrs = [
cluster_obj.g_address.gen_payment_addr_and_keys(name=f"certs_cleanup{i}") for i in range(11)
]
fund_dst = [clusterlib.TxOut(address=f.address, amount=300_000_000) for f in fund_addrs]
fund_tx_files = clusterlib.TxFiles(signing_key_files=[faucet_payment.skey_file])
cluster_obj.g_transaction.send_tx(
src_address=faucet_payment.address,
tx_name="fund_certs_cleanup",
txouts=fund_dst,
tx_files=fund_tx_files,
)

addrs_queue: queue.Queue[clusterlib.AddressRecord] = queue.Queue()
for a in fund_addrs:
addrs_queue.put(a)

def _run(cert_file: pl.Path, addrs_queue: queue.Queue[clusterlib.AddressRecord]) -> None:
# Add random sleep for < 1s to prevent
# "Network.Socket.connect: <socket: 11>: resource exhausted"
time.sleep(random.random())

fname = cert_file.name
fdir = cert_file.parent
vkey_file = fdir / cert_file.name.replace("_reg.cert", ".vkey")
skey_file = vkey_file.with_suffix(".skey")
drep_keys = clusterlib.KeyPair(vkey_file=vkey_file, skey_file=skey_file)

addr = addrs_queue.get()
try:
retire_drep(
cluster_obj=cluster_obj,
payment_addr=addr,
drep_keys=drep_keys,
name_template=fname,
deposit_amt=drep_deposit_amt,
)
finally:
addrs_queue.put(addr)

# Run cleanup in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(_run, f, addrs_queue) for f in files_found]
concurrent.futures.wait(futures)

# Return funds from the addresses that paid for fees
return_funds_to_faucet(
cluster_obj=cluster_obj,
src_addrs=fund_addrs,
faucet_address=faucet_payment.address,
tx_name="certs_cleanup_return",
)


def _get_faucet_payment_rec(
address: str = "",
skey_file: clusterlib.FileType = "",
) -> clusterlib.AddressRecord:
if address or skey_file:
if not (address and skey_file):
err = "Both 'address' and 'skey_file' need to be set."
raise ValueError(err)

faucet_payment = clusterlib.AddressRecord(
address=address,
vkey_file=pl.Path("/nonexistent"), # We don't need this for faucet
skey_file=pl.Path(skey_file),
)
else:
# Try to infer the faucet address and keys from cluster env
cluster_env = cluster_nodes.get_cluster_env()
faucet_addr_file = cluster_env.state_dir / "shelley" / "faucet.addr"
faucet_payment = create_addr_record(faucet_addr_file)

return faucet_payment


def cleanup(
cluster_obj: clusterlib.ClusterLib,
location: clusterlib.FileType,
faucet_address: str = "",
faucet_skey_file: clusterlib.FileType = "",
) -> None:
"""Cleanup a testnet with the help of testing artifacts."""
location = pl.Path(location).expanduser().resolve()
faucet_payment = _get_faucet_payment_rec(address=faucet_address, skey_file=faucet_skey_file)
cleanup_addresses(cluster_obj=cluster_obj, location=location, faucet_payment=faucet_payment)
cleanup_certs(cluster_obj=cluster_obj, location=location, faucet_payment=faucet_payment)

# Defragment faucet address UTxOs
defragment_utxos.defragment(
cluster_obj=cluster_obj, address=faucet_payment.address, skey_file=faucet_payment.skey_file
Expand Down
Loading