Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Feb 5, 2024
1 parent 8617491 commit 6b304a7
Show file tree
Hide file tree
Showing 19 changed files with 168 additions and 121 deletions.
3 changes: 2 additions & 1 deletion Collections/base_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ class BaseNodes(Collection[BaseNode]):
def __init__(self):
super().__init__()

def add(self):
def add(self) -> str:
id = len(self.items)
addresses = self.get_addresses()
self.items[id] = BaseNode(id, local_ip, addresses)
return self.items[id].name

def get_addresses(self) -> list[str]:
return [self.items[base_node_id].get_address() for base_node_id in self.items]
Expand Down
3 changes: 2 additions & 1 deletion Collections/base_wallets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ class BaseWallets(Collection[BaseWallet]):
def __init__(self):
super().__init__()

def add(self):
def add(self) -> str:
id = len(self.items)
self.items[id] = BaseWallet(id, base_nodes.any().get_address(), local_ip, base_nodes.get_addresses())
return self.items[id].name


base_wallets = BaseWallets()
15 changes: 11 additions & 4 deletions Collections/collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from Processes.common_exec import CommonExec

Item = TypeVar("Item")
Item = TypeVar("Item", bound=CommonExec)


class Collection(ABC, Generic[Item]):
Expand All @@ -12,7 +13,7 @@ def has(self, id: int) -> bool:
return id in self.items

@abstractmethod
def add(self):
def add(self) -> str:
pass

def any(self) -> Item:
Expand All @@ -31,7 +32,13 @@ def __len__(self):
def __getitem__(self, id: int) -> Item:
return self.items[id]

def stop(self, id: int):
def start(self, id: int) -> bool:
if self.has(id):
del self.items[id]
return self.items[id].run()
return False

def stop(self, id: int) -> bool:
if self.has(id) and self.items[id].is_running():
return self.items[id].stop()
print(f"Id ({id}) is invalid, either it never existed or you already stopped it")
return False
3 changes: 2 additions & 1 deletion Collections/dan_wallet_daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ class DanWalletDaemons(Collection[DanWalletDaemon]):
def __init__(self):
super().__init__()

def add(self):
def add(self) -> str:
id = len(self.items)
self.items[id] = DanWalletDaemon(id, indexers[id % len(indexers)].json_rpc_port, signaling_server.json_rpc_port, local_ip)
return self.items[id].name

def jrpc(self, index: int) -> Optional[str]:
if index in self.items:
Expand Down
3 changes: 2 additions & 1 deletion Collections/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ def wait_for_sync(self):
def get_addresses(self) -> list[str]:
return [indexer.get_address() for indexer in self.items.values()]

def add(self):
def add(self) -> str:
id = len(self.items)
self.items[id] = Indexer(id, base_nodes.any().grpc_port, validator_nodes.get_addresses())
return self.items[id].name

def jrpc(self, index: int) -> Optional[str]:
if index in self.items:
Expand Down
5 changes: 3 additions & 2 deletions Collections/validator_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ def wait_for_sync(self):
def get_addresses(self) -> list[str]:
return [self.items[vn_id].get_address() for vn_id in self.items]

def add(self):
def add(self) -> str:
id = len(self.items)
self.items[id] = ValidatorNode(base_nodes.any().grpc_port, base_wallets.any().grpc_port, id, self.get_addresses())
return self.items[id].name

def register(self, claim_public_key: str):
print("Waiting for wallet balance", end=".")
Expand All @@ -44,7 +45,7 @@ def register(self, claim_public_key: str):
time.sleep(1)
print(".", end="")
self.items[vn_id].register(local_ip, claim_public_key)
# Uncomment next line if you want to have only one registeration per block
# Uncomment next line if you want to have only one registration per block
# miner.mine(1)
print("done")

Expand Down
18 changes: 2 additions & 16 deletions Common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
STEP_OUTER_COLOR = COLOR_BRIGHT_GREEN


def is_boolstring(value: str):
def is_boolstring(value: str) -> bool:
return value.lower() in ["false", "true", "0", "1"]


def is_boolstring_true(value: str):
def is_boolstring_true(value: str) -> bool:
return value.lower() in ["true", "1"]


Expand All @@ -48,20 +48,6 @@ def get_env_or_default(env_name: str, default: Any, validation: Any = None) -> A
DELETE_EVERYTHING_BUT_TEMPLATES_BEFORE = True
DELETE_STDOUT_LOGS = True
DELETE_TEMPLATES = False
REDIRECT_BASE_NODE_STDOUT = True
REDIRECT_WALLET_STDOUT = True
REDIRECT_MINER_STDOUT = True
# how many VNs should print to console
REDIRECT_VN_FROM_INDEX_STDOUT = 0
# how many dan wallets should print to console
REDIRECT_DAN_WALLET_STDOUT = 0
# The register vn cli is redirected as VN, this is for the publish template etc.
REDIRECT_INDEXER_STDOUT = 0
# This is for the cargo generate and compilation for the template
REDIRECT_CARGO_INSTALL_CARGO_GENERATE_STDOUT = True
REDIRECT_TEMPLATE_STDOUT = True
REDIRECT_DAN_WALLET_WEBUI_STDOUT = True
REDIRECT_SIGNALING_STDOUT = True
NETWORK = "localnet"
SPAWN_VNS = int(get_env_or_default("DAN_TESTING_SPAWN_VNS", 1))
print("SPAWN_VNS", SPAWN_VNS)
Expand Down
19 changes: 14 additions & 5 deletions Processes/base_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from grpc import insecure_channel # type:ignore

from protos import types_pb2, base_node_pb2_grpc, base_node_pb2, network_pb2, block_pb2
from Common.config import TARI_BINS_FOLDER, NETWORK, REDIRECT_BASE_NODE_STDOUT, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Common.config import TARI_BINS_FOLDER, NETWORK, USE_BINARY_EXECUTABLE, DATA_FOLDER
import os
import re
import time
from typing import Iterator, Any
from typing import Iterator, Any, Optional
from Processes.common_exec import CommonExec


Expand Down Expand Up @@ -68,6 +68,7 @@ def search_kernels(self, signatures: list[types_pb2.Signature]) -> Iterator[bloc
class BaseNode(CommonExec):
def __init__(self, id: int, local_ip: str, peer_seeds: list[str] = []):
super().__init__("BaseNode", id)
self.local_ip = local_ip
self.public_port = self.get_port("public_address")
self.public_address = f"/ip4/{local_ip}/tcp/{self.public_port}"
self.grpc_port = self.get_port("GRPC")
Expand Down Expand Up @@ -103,24 +104,32 @@ def __init__(self, id: int, local_ip: str, peer_seeds: list[str] = []):
if peer_seeds:
self.exec.append("-p")
self.exec.append(f"{NETWORK}.p2p.seeds.peer_seeds={','.join(peer_seeds)}")
self.run(REDIRECT_BASE_NODE_STDOUT)
self.run()

def run(self, cwd: Optional[str] = None) -> bool:
if not super().run():
return False
# Sometimes it takes a while to establish the grpc connection
while True:
try:
self.grpc_client = GrpcBaseNode(f"{local_ip}:{self.grpc_port}")
self.grpc_client = GrpcBaseNode(f"{self.local_ip}:{self.grpc_port}")
self.grpc_client.get_version()
break
except Exception as e:
print(e)
time.sleep(1)
return True

def get_address(self) -> str:
base_node_id_file_name = os.path.join(DATA_FOLDER, self.name, NETWORK, "config", "base_node_id.json")
while not os.path.exists(base_node_id_file_name):
time.sleep(1)
f = open(base_node_id_file_name, "rt")
content = "".join(f.readlines())
node_id, public_key, public_address = re.search(r'"node_id":"(.*?)","public_key":"(.*?)".*"public_addresses":\["(.*?)"', content).groups()
parsed = re.search(r'"node_id":"(.*?)","public_key":"(.*?)".*"public_addresses":\["(.*?)"', content)
if not parsed:
return ""
_, public_key, public_address = parsed.groups()
public_address = public_address.replace("\\/", "/")
return f"{public_key}::{public_address}"

Expand Down
12 changes: 9 additions & 3 deletions Processes/base_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
print("You forgot to generate protos, run protos.sh or protos.bat")
exit()

from Common.config import TARI_BINS_FOLDER, NETWORK, REDIRECT_WALLET_STDOUT, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Common.config import TARI_BINS_FOLDER, NETWORK, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Processes.common_exec import CommonExec
import time
import os
Expand Down Expand Up @@ -51,6 +51,7 @@ def create_burn_transaction(
class BaseWallet(CommonExec):
def __init__(self, id: int, base_node_address: str, local_ip: str, peer_seeds: list[str] = []):
super().__init__("BaseWallet", id)
self.local_ip = local_ip
self.public_port = self.get_port("public_address")
self.public_address = f"/ip4/{local_ip}/tcp/{self.public_port}"
self.grpc_port = self.get_port("GRPC")
Expand Down Expand Up @@ -87,16 +88,21 @@ def __init__(self, id: int, base_node_address: str, local_ip: str, peer_seeds: l
self.exec.append("-p")
self.exec.append(f"{NETWORK}.p2p.seeds.peer_seeds={','.join(peer_seeds)}")

self.run(REDIRECT_WALLET_STDOUT)
self.run()

def run(self, cwd: str | None = None) -> bool:
if not super().run(cwd):
return False
# Sometimes it takes a while to establish the grpc connection
while True:
try:
self.grpc_client = GrpcWallet(f"{local_ip}:{self.grpc_port}")
self.grpc_client = GrpcWallet(f"{self.local_ip}:{self.grpc_port}")
self.grpc_client.get_version()
break
except Exception as e:
print(e)
time.sleep(1)
return True

def get_info_for_ui(self) -> dict[str, Any]:
return {"name": self.name, "grpc": self.grpc_client.address}
15 changes: 9 additions & 6 deletions Processes/common_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from Processes.subprocess_wrapper import SubprocessWrapper
import subprocess
from Common.ports import ports
from typing import Optional, Any, Union
from typing import Optional, Any
from Common.config import NAME_COLOR, COLOR_RESET, EXEC_COLOR, DATA_FOLDER
import sys

Expand All @@ -18,12 +18,13 @@ def __init__(self, name: str, id: Optional[int] = None):
self.name = f"{name}_{id}"
self.exec = ""
self.process: Optional[subprocess.Popen[Any]] = None
self.running = False

def get_port(self, interface: str) -> int:
return ports.get_free_port(f"{self.name} {interface}")

def run(self, redirect: Union[bool, int], cwd: Optional[str] = None):
def run(self, cwd: Optional[str] = None) -> bool:
if self.is_running():
return False
env: dict[str, str] = os.environ.copy()
self.process = SubprocessWrapper.Popen(
self.exec,
Expand All @@ -33,16 +34,16 @@ def run(self, redirect: Union[bool, int], cwd: Optional[str] = None):
env={**env, **self.env},
cwd=cwd,
)
self.running = True
return True

def is_running(self) -> bool:
if self.process:
return self.process.poll() is None
return False

def stop(self):
def stop(self) -> bool:
if not self.process or not self.is_running():
return
return False
print(f"Kill {NAME_COLOR}{self.name}{COLOR_RESET}")
print(f"To run {EXEC_COLOR}{' '.join(self.exec).replace(' -n', '')}{COLOR_RESET}", end=" ")
if self.env:
Expand All @@ -54,6 +55,8 @@ def stop(self):
# This closes the process correctly
self.process.send_signal(signal.SIGINT)
del self.process
self.process = None
return True

def __del__(self):
self.stop()
Expand Down
12 changes: 9 additions & 3 deletions Processes/dan_wallet_daemon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Common.config import TARI_DAN_BINS_FOLDER, REDIRECT_DAN_WALLET_STDOUT, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Common.config import TARI_DAN_BINS_FOLDER, USE_BINARY_EXECUTABLE, DATA_FOLDER
import base64
import os
import requests
Expand Down Expand Up @@ -182,16 +182,22 @@ def __init__(self, dan_wallet_id: int, indexer_jrpc_port: int, signaling_server_
]
if signaling_server_port:
self.exec = [*self.exec, "--signaling-server-address", f"{local_ip}:{signaling_server_port}"]
self.run(REDIRECT_DAN_WALLET_STDOUT)
self.run()

def run(self, cwd: str | None = None) -> bool:
if not super().run(cwd):
return False
if not self.process:
return False
# (out, err) = self.process.communicate()
self.jrpc_client = JrpcDanWalletDaemon(f"http://{self.json_connect_address}")
while not os.path.exists(os.path.join(DATA_FOLDER, self.name, "localnet", "pid")):
if self.process.poll() is None:
time.sleep(1)
else:
raise Exception(f"DAN wallet did not start successfully: Exit code:{self.process.poll()}")
print(f"Dan wallet daemon {dan_wallet_id} started")
print(f"Dan wallet daemon {self.name} started")
return True

def get_info_for_ui(self):
return {"name": self.name, "http": self.http_connect_address, "jrpc": self.json_connect_address}
12 changes: 9 additions & 3 deletions Processes/indexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# type:ignore

from Common.config import TARI_DAN_BINS_FOLDER, NETWORK, REDIRECT_INDEXER_STDOUT, USE_BINARY_EXECUTABLE, DATA_FOLDER
from typing import Optional
from Common.config import TARI_DAN_BINS_FOLDER, NETWORK, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Common.ports import ports
import os
import time
Expand Down Expand Up @@ -62,14 +63,19 @@ def __init__(self, indexer_id: int, base_node_grpc_port: int, peers=[]):
"-p",
f"indexer.ui_connect_address=http://{self.json_connect_address}",
]
self.run(REDIRECT_INDEXER_STDOUT)
self.run()

def run(self, cwd: str | None = None) -> bool:
if not super().run(cwd):
return False
self.jrpc_client = JrpcIndexer(f"http://{self.json_connect_address}")
while not os.path.exists(os.path.join(DATA_FOLDER, f"indexer_{self.id}", "localnet", "pid")):
if self.process.poll() is None:
time.sleep(1)
else:
raise Exception(f"Indexer did not start successfully: Exit code:{self.process.poll()}")
print(f"Indexer {self.id} started")
print(f"Indexer {self.name} started")
return True

def get_address(self):
if NETWORK == "localnet":
Expand Down
7 changes: 2 additions & 5 deletions Processes/miner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Common.config import TARI_BINS_FOLDER, NETWORK, REDIRECT_MINER_STDOUT, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Common.config import TARI_BINS_FOLDER, NETWORK, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Processes.subprocess_wrapper import SubprocessWrapper
import subprocess
import os
Expand Down Expand Up @@ -36,10 +36,7 @@ def mine(self, blocks: int):
for _ in range(blocks):
self.exec = list(self.exec_template)
self.exec[self.exec.index("#blocks")] = "1"
if REDIRECT_MINER_STDOUT:
self.process = SubprocessWrapper.call(self.exec, stdout=open(os.path.join(DATA_FOLDER, "stdout", "shamin.log"), "a+"), stderr=subprocess.STDOUT)
else:
self.process = SubprocessWrapper.call(self.exec)
self.process = SubprocessWrapper.call(self.exec, stdout=open(os.path.join(DATA_FOLDER, "stdout", "shamin.log"), "a+"), stderr=subprocess.STDOUT)
time.sleep(1)

def get_logs(self):
Expand Down
4 changes: 2 additions & 2 deletions Processes/signaling_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# type:ignore
import os
import time
from Common.config import TARI_DAN_BINS_FOLDER, USE_BINARY_EXECUTABLE, REDIRECT_SIGNALING_STDOUT, DATA_FOLDER
from Common.config import TARI_DAN_BINS_FOLDER, USE_BINARY_EXECUTABLE, DATA_FOLDER
from Processes.common_exec import CommonExec


Expand All @@ -25,7 +25,7 @@ def start(self, local_ip: str):
"--listen-addr",
f"{local_ip}:{self.json_rpc_port}",
]
self.run(REDIRECT_SIGNALING_STDOUT)
self.run()
print("Waiting for signaling server to start.", end="")
while not os.path.exists(os.path.join(DATA_FOLDER, "signaling_server", "pid")):
print(".", end="")
Expand Down
Loading

0 comments on commit 6b304a7

Please sign in to comment.