diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index f5cc084e9..59296eb3d 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -1,5 +1,5 @@
name: Test
-on: [push, pull_request]
+on: [push]
env:
ETH_PRIVATE_KEY: ${{ secrets.ETH_PRIVATE_KEY }}
SCHAIN_TYPE: ${{ secrets.SCHAIN_TYPE }}
diff --git a/core/schains/checks.py b/core/schains/checks.py
index 8f4b0d9ee..0202ed22d 100644
--- a/core/schains/checks.py
+++ b/core/schains/checks.py
@@ -41,7 +41,7 @@
from core.schains.dkg.utils import get_secret_key_share_filepath
from core.schains.firewall.types import IRuleController
from core.schains.ima import get_ima_time_frame, get_migration_ts as get_ima_migration_ts
-from core.schains.process_manager_helper import is_monitor_process_alive
+from core.schains.process import is_monitor_process_alive
from core.schains.rpc import (
check_endpoint_alive,
check_endpoint_blocks,
diff --git a/core/schains/cleaner.py b/core/schains/cleaner.py
index dcf4b52b8..f006d439a 100644
--- a/core/schains/cleaner.py
+++ b/core/schains/cleaner.py
@@ -37,7 +37,7 @@
get_node_ips_from_config,
get_own_ip_from_config,
)
-from core.schains.process_manager_helper import terminate_schain_process
+from core.schains.process import ProcessReport, terminate_process
from core.schains.runner import get_container_name, is_exited
from core.schains.external_config import ExternalConfig
from core.schains.types import ContainerType
@@ -109,12 +109,15 @@ def monitor(skale, node_config, dutils=None):
for schain_name in schains_on_node:
if schain_name not in schain_names_on_contracts:
- logger.warning(f'sChain {schain_name} was found on node, but not on contracts: \
-{schain_names_on_contracts}, going to remove it!')
+ logger.warning(
+ '%s was found on node, but not on contracts: %s, trying to cleanup',
+ schain_name,
+ schain_names_on_contracts,
+ )
try:
ensure_schain_removed(skale, schain_name, node_config.id, dutils=dutils)
except Exception:
- logger.exception(f'sChain removal {schain_name} failed')
+ logger.exception('%s removal failed', schain_name)
logger.info('Cleanup procedure finished')
@@ -185,9 +188,10 @@ def remove_schain(
msg: str,
dutils: Optional[DockerUtils] = None,
) -> None:
- schain_record = upsert_schain_record(schain_name)
logger.warning(msg)
- terminate_schain_process(schain_record)
+ report = ProcessReport(name=schain_name)
+ if report.is_exist():
+ terminate_process(report.pid)
delete_bls_keys(skale, schain_name)
sync_agent_ranges = get_sync_agent_ranges(skale)
@@ -240,9 +244,7 @@ def cleanup_schain(
)
check_status = checks.get_all()
if check_status['skaled_container'] or is_exited(
- schain_name,
- container_type=ContainerType.schain,
- dutils=dutils
+ schain_name, container_type=ContainerType.schain, dutils=dutils
):
remove_schain_container(schain_name, dutils=dutils)
if check_status['volume']:
@@ -259,9 +261,7 @@ def cleanup_schain(
rc.cleanup()
if estate is not None and estate.ima_linked:
if check_status.get('ima_container', False) or is_exited(
- schain_name,
- container_type=ContainerType.ima,
- dutils=dutils
+ schain_name, container_type=ContainerType.ima, dutils=dutils
):
remove_ima_container(schain_name, dutils=dutils)
if check_status['config_dir']:
diff --git a/core/schains/monitor/action.py b/core/schains/monitor/action.py
index b02cbab44..75dd0d5d1 100644
--- a/core/schains/monitor/action.py
+++ b/core/schains/monitor/action.py
@@ -212,7 +212,7 @@ def dkg(self) -> bool:
def upstream_config(self) -> bool:
with self.statsd_client.timer(f'admin.action.upstream_config.{no_hyphens(self.name)}'):
logger.info(
- 'Creating new upstream_config rotation_id: %s, stream: %s',
+ 'Generating new upstream_config rotation_id: %s, stream: %s',
self.rotation_data.get('rotation_id'), self.stream_version
)
new_config = create_new_upstream_config(
@@ -229,6 +229,7 @@ def upstream_config(self) -> bool:
result = False
if not self.cfm.upstream_config_exists() or \
new_config != self.cfm.latest_upstream_config:
+ logger.info('Saving new config')
rotation_id = self.rotation_data['rotation_id']
logger.info(
'Saving new upstream config rotation_id: %d, ips: %s',
diff --git a/core/schains/monitor/config_monitor.py b/core/schains/monitor/config_monitor.py
index 47587a1bc..639689870 100644
--- a/core/schains/monitor/config_monitor.py
+++ b/core/schains/monitor/config_monitor.py
@@ -45,6 +45,8 @@ def run(self):
self.execute()
self.am.log_executed_blocks()
self.am._upd_last_seen()
+ except Exception as e:
+ logger.info('Config monitor type failed %s', typename, exc_info=e)
finally:
logger.info('Config monitor type finished %s', typename)
diff --git a/core/schains/monitor/main.py b/core/schains/monitor/main.py
index 0975ab23c..b3c589724 100644
--- a/core/schains/monitor/main.py
+++ b/core/schains/monitor/main.py
@@ -18,13 +18,11 @@
# along with this program. If not, see .
import functools
-import time
-import random
import logging
-from typing import Dict
-from concurrent.futures import Future, ThreadPoolExecutor
+import os
+import time
+from typing import Callable, Optional
from importlib import reload
-from typing import List, Optional
from skale import Skale, SkaleIma
from skale.contracts.manager.schains import SchainStructure
@@ -34,51 +32,55 @@
from core.node_config import NodeConfig
from core.schains.checks import ConfigChecks, get_api_checks_status, TG_ALLOWED_CHECKS, SkaledChecks
from core.schains.config.file_manager import ConfigFileManager
+from core.schains.config.static_params import get_automatic_repair_option
from core.schains.firewall import get_default_rule_controller
from core.schains.firewall.utils import get_sync_agent_ranges
+from core.schains.external_config import ExternalConfig, ExternalState
from core.schains.monitor import get_skaled_monitor, RegularConfigMonitor, SyncConfigMonitor
from core.schains.monitor.action import ConfigActionManager, SkaledActionManager
-from core.schains.external_config import ExternalConfig, ExternalState
-from core.schains.task import keep_tasks_running, Task
-from core.schains.config.static_params import get_automatic_repair_option
+from core.schains.monitor.tasks import execute_tasks, Future, ITask
+from core.schains.process import ProcessReport
from core.schains.status import get_node_cli_status, get_skaled_status
from core.node import get_current_nodes
from tools.docker_utils import DockerUtils
from tools.configs import SYNC_NODE
+from tools.configs.schains import DKG_TIMEOUT_COEFFICIENT
from tools.notifications.messages import notify_checks
from tools.helper import is_node_part_of_chain, no_hyphens
from tools.resources import get_statsd_client
-from web.models.schain import SChainRecord
+from web.models.schain import SChainRecord, upsert_schain_record
-MIN_SCHAIN_MONITOR_SLEEP_INTERVAL = 20
-MAX_SCHAIN_MONITOR_SLEEP_INTERVAL = 40
+logger = logging.getLogger(__name__)
-SKALED_PIPELINE_SLEEP = 2
-CONFIG_PIPELINE_SLEEP = 3
-logger = logging.getLogger(__name__)
+class NoTasksToRunError(Exception):
+ pass
def run_config_pipeline(
- skale: Skale, skale_ima: SkaleIma, schain: Dict, node_config: NodeConfig, stream_version: str
+ schain_name: str,
+ skale: Skale,
+ skale_ima: SkaleIma,
+ node_config: NodeConfig,
+ stream_version: str,
) -> None:
- name = schain.name
- schain_record = SChainRecord.get_by_name(name)
- rotation_data = skale.node_rotation.get_rotation(name)
+ schain = skale.schains.get_by_name(schain_name)
+ schain_record = SChainRecord.get_by_name(schain_name)
+ rotation_data = skale.node_rotation.get_rotation(schain_name)
allowed_ranges = get_sync_agent_ranges(skale)
- ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(name)
- group_index = skale.schains.name_to_group_id(name)
+ ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(schain_name)
+ group_index = skale.schains.name_to_group_id(schain_name)
last_dkg_successful = skale.dkg.is_last_dkg_successful(group_index)
- current_nodes = get_current_nodes(skale, name)
+ current_nodes = get_current_nodes(skale, schain_name)
estate = ExternalState(
ima_linked=ima_linked, chain_id=skale_ima.web3.eth.chain_id, ranges=allowed_ranges
)
- econfig = ExternalConfig(name)
+ econfig = ExternalConfig(schain_name)
config_checks = ConfigChecks(
- schain_name=name,
+ schain_name=schain_name,
node_id=node_config.id,
schain_record=schain_record,
stream_version=stream_version,
@@ -114,24 +116,25 @@ def run_config_pipeline(
mon = RegularConfigMonitor(config_am, config_checks)
statsd_client = get_statsd_client()
- statsd_client.incr(f'admin.config_pipeline.{mon.__class__.__name__}.{no_hyphens(name)}')
+ statsd_client.incr(f'admin.config_pipeline.{mon.__class__.__name__}.{no_hyphens(schain_name)}')
statsd_client.gauge(
- f'admin.config_pipeline.rotation_id.{no_hyphens(name)}', rotation_data['rotation_id']
+ f'admin.config_pipeline.rotation_id.{no_hyphens(schain_name)}', rotation_data['rotation_id']
)
- with statsd_client.timer(f'admin.config_pipeline.duration.{no_hyphens(name)}'):
+ with statsd_client.timer(f'admin.config_pipeline.duration.{no_hyphens(schain_name)}'):
mon.run()
def run_skaled_pipeline(
- skale: Skale, schain: SchainStructure, node_config: NodeConfig, dutils: DockerUtils
+ schain_name: str, skale: Skale, node_config: NodeConfig, dutils: DockerUtils
) -> None:
- name = schain.name
- schain_record = SChainRecord.get_by_name(name)
+ schain = skale.schains.get_by_name(schain_name)
+ schain_record = SChainRecord.get_by_name(schain_name)
+
logger.info('Record: %s', SChainRecord.to_dict(schain_record))
dutils = dutils or DockerUtils()
- rc = get_default_rule_controller(name=name)
+ rc = get_default_rule_controller(name=schain_name)
skaled_checks = SkaledChecks(
schain_name=schain.name,
schain_record=schain_record,
@@ -140,8 +143,8 @@ def run_skaled_pipeline(
sync_node=SYNC_NODE,
)
- skaled_status = get_skaled_status(name)
- ncli_status = get_node_cli_status(name)
+ skaled_status = get_skaled_status(schain_name)
+ ncli_status = get_node_cli_status(schain_name)
skaled_am = SkaledActionManager(
schain=schain,
@@ -149,13 +152,13 @@ def run_skaled_pipeline(
checks=skaled_checks,
node_config=node_config,
ncli_status=ncli_status,
- econfig=ExternalConfig(name),
+ econfig=ExternalConfig(schain_name),
dutils=dutils,
)
check_status = skaled_checks.get_all(log=False, expose=True)
automatic_repair = get_automatic_repair_option()
api_status = get_api_checks_status(status=check_status, allowed=TG_ALLOWED_CHECKS)
- notify_checks(name, node_config.all(), api_status)
+ notify_checks(schain_name, node_config.all(), api_status)
logger.info('Skaled check status: %s', check_status)
@@ -167,36 +170,153 @@ def run_skaled_pipeline(
schain_record=schain_record,
skaled_status=skaled_status,
ncli_status=ncli_status,
- automatic_repair=automatic_repair
+ automatic_repair=automatic_repair,
)
statsd_client = get_statsd_client()
- statsd_client.incr(f'admin.skaled_pipeline.{mon.__name__}.{no_hyphens(name)}')
- with statsd_client.timer(f'admin.skaled_pipeline.duration.{no_hyphens(name)}'):
+ statsd_client.incr(f'admin.skaled_pipeline.{mon.__name__}.{no_hyphens(schain_name)}')
+ with statsd_client.timer(f'admin.skaled_pipeline.duration.{no_hyphens(schain_name)}'):
mon(skaled_am, skaled_checks).run()
-def post_monitor_sleep():
- schain_monitor_sleep = random.randint(
- MIN_SCHAIN_MONITOR_SLEEP_INTERVAL, MAX_SCHAIN_MONITOR_SLEEP_INTERVAL
- )
- logger.info('Monitor iteration completed, sleeping for %d', schain_monitor_sleep)
- time.sleep(schain_monitor_sleep)
+class SkaledTask(ITask):
+ NAME = 'skaled'
+ STUCK_TIMEOUT_SECONDS = 3600 # 1 hour
+
+ def __init__(
+ self,
+ schain_name: str,
+ skale: Skale,
+ node_config: NodeConfig,
+ stream_version: str,
+ dutils: Optional[DockerUtils] = None,
+ ) -> None:
+ self.schain_name = schain_name
+ self.skale = skale
+ self.node_config = node_config
+ self.dutils = dutils
+ self._future = Future()
+ self._start_ts = 0
+ self.stream_version = stream_version
+
+ @property
+ def name(self) -> str:
+ return self.NAME
+
+ @property
+ def stuck_timeout(self) -> int:
+ return self.STUCK_TIMEOUT_SECONDS
+
+ @property
+ def future(self) -> Future:
+ return self._future
+
+ @future.setter
+ def future(self, value: Future) -> None:
+ self._future = value
+
+ @property
+ def start_ts(self) -> int:
+ return self._start_ts
+
+ @start_ts.setter
+ def start_ts(self, value: int) -> None:
+ self._start_ts = value
+
+ @property
+ def needed(self) -> bool:
+ schain_record = upsert_schain_record(self.schain_name)
+ return schain_record.config_version == self.stream_version and (
+ not schain_record.sync_config_run or not schain_record.first_run
+ )
+ def create_pipeline(self) -> Callable:
+ return functools.partial(
+ run_skaled_pipeline,
+ schain_name=self.schain_name,
+ skale=self.skale,
+ node_config=self.node_config,
+ dutils=self.dutils,
+ )
+
+
+class ConfigTask(ITask):
+ NAME = 'config'
+ STUCK_TIMEOUT_SECONDS = 60 * 60 * 2
+
+ def __init__(
+ self,
+ schain_name: str,
+ skale: Skale,
+ skale_ima: SkaleIma,
+ node_config: NodeConfig,
+ stream_version: str,
+ ) -> None:
+ self.schain_name = schain_name
+ self.skale = skale
+ self.skale_ima = skale_ima
+ self.node_config = node_config
+ self.stream_version = stream_version
+ self._start_ts = 0
+ self._future = Future()
+
+ @property
+ def name(self) -> str:
+ return self.NAME
+
+ @property
+ def future(self) -> Future:
+ return self._future
+
+ @future.setter
+ def future(self, value: Future) -> None:
+ self._future = value
+
+ @property
+ def stuck_timeout(self) -> int:
+ dkg_timeout = self.skale.constants_holder.get_dkg_timeout()
+ return int(dkg_timeout * DKG_TIMEOUT_COEFFICIENT)
+
+ @property
+ def start_ts(self) -> int:
+ return self._start_ts
+
+ @start_ts.setter
+ def start_ts(self, value: int) -> None:
+ self._start_ts = value
+
+ @property
+ def needed(self) -> bool:
+ return SYNC_NODE or is_node_part_of_chain(self.skale, self.schain_name, self.node_config.id)
+
+ def create_pipeline(self) -> Callable:
+ return functools.partial(
+ run_config_pipeline,
+ schain_name=self.schain_name,
+ skale=self.skale,
+ skale_ima=self.skale_ima,
+ node_config=self.node_config,
+ stream_version=self.stream_version,
+ )
-def create_and_execute_tasks(
- skale,
- schain,
+
+def start_tasks(
+ skale: Skale,
+ schain: SchainStructure,
node_config: NodeConfig,
skale_ima: SkaleIma,
- stream_version,
- schain_record,
- executor,
- futures,
- dutils,
-):
+ dutils: Optional[DockerUtils] = None,
+) -> bool:
reload(web3_request)
+
name = schain.name
+ init_ts, pid = int(time.time()), os.getpid()
+ logger.info('Initialazing process report %d %d', pid, init_ts)
+ process_report = ProcessReport(name)
+ process_report.update(pid, init_ts)
+
+ stream_version = get_skale_node_version()
+ schain_record = upsert_schain_record(name)
is_rotation_active = skale.node_rotation.is_rotation_active(name)
@@ -217,74 +337,26 @@ def create_and_execute_tasks(
statsd_client.incr(f'admin.schain.monitor.{no_hyphens(name)}')
statsd_client.gauge(f'admin.schain.monitor_last_seen.{no_hyphens(name)}', monitor_last_seen_ts)
- tasks = []
- if not leaving_chain:
- logger.info('Adding config task to the pool')
- tasks.append(
- Task(
- f'{name}-config',
- functools.partial(
- run_config_pipeline,
- skale=skale,
- skale_ima=skale_ima,
- schain=schain,
- node_config=node_config,
- stream_version=stream_version,
- ),
- sleep=CONFIG_PIPELINE_SLEEP,
- )
- )
if schain_record.config_version != stream_version or (
schain_record.sync_config_run and schain_record.first_run
):
+ logger.info('Fetching upstream config requested. Removing the old skaled config')
ConfigFileManager(name).remove_skaled_config()
- else:
- logger.info('Adding skaled task to the pool')
- tasks.append(
- Task(
- f'{name}-skaled',
- functools.partial(
- run_skaled_pipeline,
- skale=skale,
- schain=schain,
- node_config=node_config,
- dutils=dutils,
- ),
- sleep=SKALED_PIPELINE_SLEEP,
- )
- )
- if len(tasks) == 0:
- logger.warning('No tasks to run')
- keep_tasks_running(executor, tasks, futures)
-
-
-def run_monitor_for_schain(
- skale, skale_ima, node_config: NodeConfig, schain, dutils=None, once=False
-):
- stream_version = get_skale_node_version()
- tasks_number = 2
- with ThreadPoolExecutor(max_workers=tasks_number, thread_name_prefix='T') as executor:
- futures: List[Optional[Future]] = [None for i in range(tasks_number)]
- while True:
- schain_record = SChainRecord.get_by_name(schain.name)
- try:
- create_and_execute_tasks(
- skale,
- schain,
- node_config,
- skale_ima,
- stream_version,
- schain_record,
- executor,
- futures,
- dutils,
- )
- if once:
- return True
- post_monitor_sleep()
- except Exception:
- logger.exception('Monitor iteration failed')
- if once:
- return False
- post_monitor_sleep()
+ tasks = [
+ ConfigTask(
+ schain_name=schain.name,
+ skale=skale,
+ skale_ima=skale_ima,
+ node_config=node_config,
+ stream_version=stream_version,
+ ),
+ SkaledTask(
+ schain_name=schain.name,
+ skale=skale,
+ node_config=node_config,
+ stream_version=stream_version,
+ dutils=dutils
+ ),
+ ]
+ execute_tasks(tasks=tasks, process_report=process_report)
diff --git a/core/schains/monitor/skaled_monitor.py b/core/schains/monitor/skaled_monitor.py
index a946ca2be..a0fe5c99b 100644
--- a/core/schains/monitor/skaled_monitor.py
+++ b/core/schains/monitor/skaled_monitor.py
@@ -55,6 +55,8 @@ def run(self):
self.am._upd_schain_record()
self.am.log_executed_blocks()
self.am._upd_last_seen()
+ except Exception as e:
+ logger.info('Skaled monitor type failed %s', typename, exc_info=e)
finally:
logger.info('Skaled monitor type finished %s', typename)
diff --git a/core/schains/monitor/tasks.py b/core/schains/monitor/tasks.py
new file mode 100644
index 000000000..7fcc86f82
--- /dev/null
+++ b/core/schains/monitor/tasks.py
@@ -0,0 +1,86 @@
+import abc
+import logging
+import time
+from concurrent.futures import Future, ThreadPoolExecutor
+from typing import Callable
+
+from core.schains.process import ProcessReport
+
+
+logger = logging.getLogger(__name__)
+
+
+SLEEP_INTERVAL_SECONDS = 10
+
+
+class ITask(metaclass=abc.ABCMeta):
+ @property
+ @abc.abstractmethod
+ def name(self) -> str:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def stuck_timeout(self) -> int:
+ pass
+
+ @abc.abstractmethod
+ def create_pipeline(self) -> Callable:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def future(self) -> Future:
+ pass
+
+ @future.setter
+ @abc.abstractmethod
+ def future(self, value: Future) -> None:
+ pass
+
+ @property
+ def needed(self) -> bool:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def start_ts(self) -> int:
+ pass
+
+ @start_ts.setter
+ @abc.abstractmethod
+ def start_ts(self, value: int) -> None:
+ pass
+
+
+def execute_tasks(
+ tasks: list[ITask],
+ process_report: ProcessReport,
+ sleep_interval: int = SLEEP_INTERVAL_SECONDS,
+) -> None:
+ logger.info('Running tasks %s', tasks)
+ with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix='T') as executor:
+ stucked = []
+ while True:
+ for index, task in enumerate(tasks):
+ if not task.future.running() and task.needed and len(stucked) == 0:
+ task.start_ts = int(time.time())
+ logger.info('Starting task %s at %d', task.name, task.start_ts)
+ pipeline = task.create_pipeline()
+ task.future = executor.submit(pipeline)
+ elif task.future.running():
+ if int(time.time()) - task.start_ts > task.stuck_timeout:
+ logger.info('Canceling future for %s', task.name)
+ canceled = task.future.cancel()
+ if not canceled:
+ logger.warning('Stuck detected for job %s', task.name)
+ task.start_ts = -1
+ stucked.append(task.name)
+ time.sleep(sleep_interval)
+ if len(stucked) > 0:
+ logger.info('Sleeping before subverting execution')
+ executor.shutdown(wait=False)
+ logger.info('Subverting execution. Stucked %s', stucked)
+ process_report.ts = 0
+ break
+ process_report.ts = int(time.time())
diff --git a/core/schains/process.py b/core/schains/process.py
new file mode 100644
index 000000000..1da149995
--- /dev/null
+++ b/core/schains/process.py
@@ -0,0 +1,145 @@
+# -*- coding: utf-8 -*-
+#
+# This file is part of SKALE Admin
+#
+# Copyright (C) 2021 SKALE Labs
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import json
+import logging
+import os
+import shutil
+import signal
+from typing import Tuple
+
+import pathlib
+import psutil
+
+from tools.configs.schains import SCHAINS_DIR_PATH
+from tools.helper import check_pid
+
+
+logger = logging.getLogger(__name__)
+
+P_KILL_WAIT_TIMEOUT = 60
+
+
+def is_schain_process_report_exist(schain_name: str) -> None:
+ path = pathlib.Path(SCHAINS_DIR_PATH).joinpath(schain_name, ProcessReport.REPORT_FILENAME)
+ return path.is_file()
+
+
+def get_schain_process_info(schain_name: str) -> Tuple[int | None, int | None]:
+ report = ProcessReport(schain_name)
+ if not ProcessReport(schain_name).is_exist():
+ return None, None
+ else:
+ return report.pid, report.ts
+
+
+class ProcessReport:
+ REPORT_FILENAME = 'process.json'
+
+ def __init__(self, name: str) -> None:
+ self.path = pathlib.Path(SCHAINS_DIR_PATH).joinpath(name, self.REPORT_FILENAME)
+ self.path.parent.mkdir(parents=True, exist_ok=True)
+
+ def is_exist(self) -> bool:
+ return os.path.isfile(self.path)
+
+ @property
+ def ts(self) -> int:
+ return self.read()['ts']
+
+ @ts.setter
+ def ts(self, value: int) -> None:
+ report = {}
+ if self.is_exist():
+ report = self.read()
+ report['ts'] = value
+ self._save_tmp(report)
+ self._move()
+
+ @property
+ def pid(self) -> int:
+ return self.read()['pid']
+
+ @pid.setter
+ def pid(self, value: int) -> None:
+ report = {}
+ if self.is_exist():
+ report = self.read()
+ report['pid'] = value
+ self._save_tmp(report)
+ self._move()
+
+ @property
+ def _tmp_path(self) -> str:
+ return self.path.with_stem('.tmp.' + self.path.stem)
+
+ def read(self) -> dict:
+ with open(self.path) as process_file:
+ data = json.load(process_file)
+ return data
+
+ def _save_tmp(self, report: dict) -> None:
+ with open(self._tmp_path, 'w') as tmp_file:
+ json.dump(report, tmp_file)
+
+ def _move(self) -> str:
+ if os.path.isfile(self._tmp_path):
+ shutil.move(self._tmp_path, self.path)
+
+ def update(self, pid: int, ts: int) -> None:
+ report = {'pid': pid, 'ts': ts}
+ self._save_tmp(report=report)
+ self._move()
+
+ def cleanup(self) -> None:
+ os.remove(self.path)
+
+
+def terminate_process(
+ pid: int,
+ kill_timeout: int = P_KILL_WAIT_TIMEOUT,
+ log_msg: str = ''
+) -> None:
+ log_prefix = f'pid: {pid} - '
+
+ if log_msg != '':
+ log_prefix += f'{log_msg} - '
+ if pid == 0:
+ logger.warning(f'{log_prefix} - pid is 0, skipping')
+ return
+ try:
+ logger.warning(f'{log_prefix} - going to terminate')
+ p = psutil.Process(pid)
+ os.kill(p.pid, signal.SIGTERM)
+ p.wait(timeout=kill_timeout)
+ logger.info(f'{log_prefix} was terminated')
+ except psutil.NoSuchProcess:
+ logger.info(f'{log_prefix} - no such process')
+ except psutil.TimeoutExpired:
+ logger.warning(f'{log_prefix} - timout expired, going to kill')
+ p.kill()
+ logger.info(f'{log_prefix} - process was killed')
+ except Exception:
+ logger.exception(f'{log_prefix} - termination failed!')
+ return
+
+
+def is_monitor_process_alive(monitor_pid: int) -> bool:
+ """Checks that provided monitor_id is inited and alive"""
+ return monitor_pid != 0 and check_pid(monitor_pid)
diff --git a/core/schains/process_manager.py b/core/schains/process_manager.py
index 01cf10e1a..e14857a34 100644
--- a/core/schains/process_manager.py
+++ b/core/schains/process_manager.py
@@ -17,44 +17,30 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-import sys
import logging
-from typing import Dict
+import time
from multiprocessing import Process
+from typing import Optional
-from skale import Skale
+from skale import Skale, SkaleIma
+from skale.contracts.manager.schains import SchainStructure
-from core.schains.monitor.main import run_monitor_for_schain
+from core.node_config import NodeConfig
+from core.schains.monitor.main import start_tasks
from core.schains.notifications import notify_if_not_enough_balance
-from core.schains.process_manager_helper import (
- terminate_stuck_schain_process, is_monitor_process_alive, terminate_process
+from core.schains.process import (
+ get_schain_process_info,
+ is_monitor_process_alive,
+ terminate_process
)
-from web.models.schain import upsert_schain_record, SChainRecord
from tools.str_formatters import arguments_list_string
-
+from tools.configs.schains import DKG_TIMEOUT_COEFFICIENT
logger = logging.getLogger(__name__)
-def pm_signal_handler(*args):
- """
- This function is trigerred when SIGTERM signal is received by the main process of the app.
- The purpose of the process manager signal handler is to forward SIGTERM signal to all sChain
- processes so they can gracefully save DKG results before
- """
- records = SChainRecord.select()
- print(f'schain_records: {len(records)}')
- print(f'schain_records: {records}')
- for r in records:
- logger.warning(f'Sending SIGTERM to {r.name}, {r.monitor_id}')
- terminate_process(r.monitor_id)
- logger.warning('All sChain processes stopped, exiting...')
- sys.exit(0)
-
-
-def run_process_manager(skale, skale_ima, node_config):
- # signal.signal(signal.SIGTERM, pm_signal_handler)
+def run_process_manager(skale: Skale, skale_ima: SkaleIma, node_config: NodeConfig) -> None:
logger.info('Process manager started')
node_id = node_config.id
node_info = node_config.all()
@@ -66,30 +52,36 @@ def run_process_manager(skale, skale_ima, node_config):
logger.info('Process manager procedure finished')
-def run_pm_schain(skale, skale_ima, node_config, schain: Dict) -> None:
- schain_record = upsert_schain_record(schain.name)
- log_prefix = f'sChain {schain.name} -' # todo - move to logger formatter
-
- terminate_stuck_schain_process(skale, schain_record, schain)
- monitor_process_alive = is_monitor_process_alive(schain_record.monitor_id)
+def run_pm_schain(
+ skale: Skale,
+ skale_ima: SkaleIma,
+ node_config: NodeConfig,
+ schain: SchainStructure,
+ timeout: Optional[int] = None,
+) -> None:
+ log_prefix = f'sChain {schain.name} -'
- if not monitor_process_alive:
- logger.info(f'{log_prefix} PID {schain_record.monitor_id} is not running, spawning...')
+ if timeout is not None:
+ allowed_diff = timeout
+ else:
+ dkg_timeout = skale.constants_holder.get_dkg_timeout()
+ allowed_diff = timeout or int(dkg_timeout * DKG_TIMEOUT_COEFFICIENT)
+
+ pid, pts = get_schain_process_info(schain.name)
+ if pid is not None and is_monitor_process_alive(pid):
+ if int(time.time()) - pts > allowed_diff:
+ logger.info('%s Terminating process: PID = %d', log_prefix, pid)
+ terminate_process(pid)
+ else:
+ logger.info('%s Process is running: PID = %d', log_prefix, pid)
+ else:
process = Process(
name=schain.name,
- target=run_monitor_for_schain,
- args=(
- skale,
- skale_ima,
- node_config,
- schain
- )
+ target=start_tasks,
+ args=(skale, schain, node_config, skale_ima)
)
process.start()
- schain_record.set_monitor_id(process.ident)
- logger.info(f'{log_prefix} Process started: PID = {process.ident}')
- else:
- logger.info(f'{log_prefix} Process is running: PID = {schain_record.monitor_id}')
+ logger.info('Process started for %s', schain.name)
def fetch_schains_to_monitor(skale: Skale, node_id: int) -> list:
@@ -103,9 +95,16 @@ def fetch_schains_to_monitor(skale: Skale, node_id: int) -> list:
active_schains = list(filter(lambda schain: schain.active, schains))
schains_holes = len(schains) - len(active_schains)
logger.info(
- arguments_list_string({'Node ID': node_id, 'sChains on node': active_schains,
- 'Number of sChains on node': len(active_schains),
- 'Empty sChain structs': schains_holes}, 'Monitoring sChains'))
+ arguments_list_string(
+ {
+ 'Node ID': node_id,
+ 'sChains on node': active_schains,
+ 'Number of sChains on node': len(active_schains),
+ 'Empty sChain structs': schains_holes,
+ },
+ 'Monitoring sChains',
+ )
+ )
return active_schains
diff --git a/core/schains/process_manager_helper.py b/core/schains/process_manager_helper.py
deleted file mode 100644
index 954e6be80..000000000
--- a/core/schains/process_manager_helper.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# This file is part of SKALE Admin
-#
-# Copyright (C) 2021 SKALE Labs
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-
-import logging
-import os
-import signal
-from datetime import datetime
-
-import psutil
-
-
-from tools.helper import check_pid
-
-
-logger = logging.getLogger(__name__)
-
-TIMEOUT_COEFFICIENT = 2.2
-P_KILL_WAIT_TIMEOUT = 60
-
-
-def terminate_stuck_schain_process(skale, schain_record, schain):
- """
- This function terminates the process if last_seen time is less than
- DKG timeout * TIMEOUT_COEFFICIENT
- """
- allowed_last_seen_time = _calc_allowed_last_seen_time(skale)
- if not schain_record.monitor_last_seen:
- logging.warning(f'schain: {schain.name}, monitor_last_seen is None, skipping...')
- return
- schain_monitor_last_seen = schain_record.monitor_last_seen.timestamp()
- if allowed_last_seen_time > schain_monitor_last_seen:
- logger.warning(f'schain: {schain.name}, pid {schain_record.monitor_id} last seen is \
-{schain_monitor_last_seen}, while max allowed last_seen is {allowed_last_seen_time}, pid \
-{schain_record.monitor_id} will be terminated now!')
- terminate_schain_process(schain_record)
-
-
-def terminate_schain_process(schain_record):
- log_msg = f'schain: {schain_record.name}'
- terminate_process(schain_record.monitor_id, log_msg=log_msg)
-
-
-def terminate_process(pid, kill_timeout=P_KILL_WAIT_TIMEOUT, log_msg=''):
- log_prefix = f'pid: {pid} - '
- if log_msg != '':
- log_prefix += f'{log_msg} - '
- if pid == 0:
- logger.warning(f'{log_prefix} - pid is 0, skipping')
- return
- try:
- logger.warning(f'{log_prefix} - going to terminate')
- p = psutil.Process(pid)
- os.kill(p.pid, signal.SIGTERM)
- p.wait(timeout=kill_timeout)
- logger.info(f'{log_prefix} was terminated')
- except psutil.NoSuchProcess:
- logger.info(f'{log_prefix} - no such process')
- except psutil.TimeoutExpired:
- logger.warning(f'{log_prefix} - timout expired, going to kill')
- p.kill()
- logger.info(f'{log_prefix} - process was killed')
- except Exception:
- logging.exception(f'{log_prefix} - termination failed!')
-
-
-def is_monitor_process_alive(monitor_id):
- """Checks that provided monitor_id is inited and alive"""
- return monitor_id != 0 and check_pid(monitor_id)
-
-
-def _calc_allowed_last_seen_time(skale):
- dkg_timeout = skale.constants_holder.get_dkg_timeout()
- allowed_diff = int(dkg_timeout * TIMEOUT_COEFFICIENT)
- logger.info(f'dkg_timeout: {dkg_timeout}, TIMEOUT_COEFFICIENT: {TIMEOUT_COEFFICIENT}, \
-allowed_diff: {allowed_diff}')
- return datetime.now().timestamp() - allowed_diff
diff --git a/core/schains/task.py b/core/schains/task.py
deleted file mode 100644
index b95a8eb92..000000000
--- a/core/schains/task.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import logging
-import time
-from concurrent.futures import Future, ThreadPoolExecutor
-from typing import Callable, List, Optional
-
-logger = logging.getLogger(__name__)
-
-
-class Task:
- def __init__(
- self,
- name: str,
- action: Callable,
- index: int = 0,
- sleep: int = 2
- ) -> None:
- self.name = name
- self.index = index
- self.action = action
- self.sleep = sleep
-
- def run(self) -> None:
- try:
- self.action()
- except Exception as e:
- logger.exception('Task %s failed with %s', self.name, e)
- logger.info('Sleeping after task execution for %d', self.sleep)
- time.sleep(self.sleep)
-
-
-def keep_tasks_running(
- executor: ThreadPoolExecutor,
- tasks: List[Task],
- futures: List[Optional[Future]]
-) -> None:
- for i, task in enumerate(tasks):
- future = futures[i]
- if future is not None and not future.running():
- result = future.result()
- logger.info('Task %s finished with %s', task.name, result)
- if future is None or not future.running():
- logger.info('Running task %s', task.name)
- futures[i] = executor.submit(task.run)
-
-
-def run_tasks(name: str, tasks: List[Task]) -> None:
- with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix='T') as executor:
- futures: List[Optional[Future]] = [None for i in range(len(tasks))]
- while True:
- keep_tasks_running(executor, tasks, futures)
- time.sleep(30)
diff --git a/gunicorn.conf.py b/gunicorn.conf.py
index e301f4843..51958e107 100644
--- a/gunicorn.conf.py
+++ b/gunicorn.conf.py
@@ -1,4 +1,4 @@
bind = "127.0.0.1:3007"
workers = 2
timeout = 1000
-loglevel = "debug"
+loglevel = "info"
diff --git a/scripts/helper.sh b/scripts/helper.sh
index c58a84519..7d35bec01 100644
--- a/scripts/helper.sh
+++ b/scripts/helper.sh
@@ -32,11 +32,9 @@ export_test_env () {
tests_cleanup () {
export_test_env
- docker rm -f skale_schain_test && docker volume rm test || true
- sudo rm -r tests/skale-data/lib || true
+ rm -r tests/skale-data/lib || true
rm tests/skale-data/node_data/node_config.json || true
docker rm -f sgx-simulator || true
- docker rm -f skale_schain_test1 skale_schain_test2 skale_schain_test3 || true
find . -name \*.pyc -delete || true
mkdir -p $SGX_CERTIFICATES_FOLDER || true
rm -rf $SGX_CERTIFICATES_FOLDER/sgx.* || true
diff --git a/tests/routes/schains_test.py b/tests/routes/schains_test.py
index 9395005dd..fdede93dd 100644
--- a/tests/routes/schains_test.py
+++ b/tests/routes/schains_test.py
@@ -139,8 +139,8 @@ def test_get_schain(
def test_schain_containers_versions(skale_bp):
- expected_skaled_version = '3.16.1'
- expected_ima_version = '2.0.0-beta.9'
+ expected_skaled_version = '3.19.0'
+ expected_ima_version = '2.1.0'
data = get_bp_data(skale_bp, get_api_url(
BLUEPRINT_NAME, 'container-versions'))
assert data == {
diff --git a/tests/schains/monitor/action/skaled_action_test.py b/tests/schains/monitor/action/skaled_action_test.py
index 970dc04cc..7f66943cf 100644
--- a/tests/schains/monitor/action/skaled_action_test.py
+++ b/tests/schains/monitor/action/skaled_action_test.py
@@ -287,7 +287,7 @@ def test_ima_container_action_from_scratch(
container_name = containers[0].name
assert container_name == f'skale_ima_{skaled_am.name}'
image = dutils.get_container_image_name(container_name)
- assert image == 'skalenetwork/ima:2.0.0-beta.9'
+ assert image == 'skalenetwork/ima:2.1.0'
# @pytest.mark.skip('Docker API GA issues need to be resolved')
@@ -309,8 +309,8 @@ def test_ima_container_action_image_pulling(
container_name = containers[0].name
assert container_name == f'skale_ima_{skaled_am.name}'
image = dutils.get_container_image_name(container_name)
- assert image == 'skalenetwork/ima:2.0.0-develop.3'
- assert dutils.pulled('skalenetwork/ima:2.0.0-beta.9')
+ assert image == 'skalenetwork/ima:2.1.0-beta.3'
+ assert dutils.pulled('skalenetwork/ima:2.1.0')
def test_ima_container_action_image_migration(
@@ -330,7 +330,7 @@ def test_ima_container_action_image_migration(
container_name = containers[0].name
assert container_name == f'skale_ima_{skaled_am.name}'
image = dutils.get_container_image_name(container_name)
- assert image == 'skalenetwork/ima:2.0.0-beta.9'
+ assert image == 'skalenetwork/ima:2.1.0'
def test_ima_container_action_time_frame_migration(
diff --git a/tests/schains/monitor/main_test.py b/tests/schains/monitor/main_test.py
index f637b4c96..242fbe43e 100644
--- a/tests/schains/monitor/main_test.py
+++ b/tests/schains/monitor/main_test.py
@@ -1,17 +1,33 @@
-import mock
-from concurrent.futures import ThreadPoolExecutor
+import functools
+import logging
+import os
+import pathlib
+import shutil
+import time
+from concurrent.futures import Future
+from typing import Callable
+from unittest import mock
import pytest
from core.schains.firewall.types import IpRange
from core.schains.firewall.utils import get_sync_agent_ranges
-from core.schains.monitor.main import run_monitor_for_schain
-from core.schains.task import Task
-
+from core.schains.process import ProcessReport
+from core.schains.monitor.main import ConfigTask, SkaledTask
+from core.schains.monitor.tasks import execute_tasks, ITask
+from tools.configs.schains import SCHAINS_DIR_PATH
from tools.helper import is_node_part_of_chain
from web.models.schain import upsert_schain_record
-from tests.utils import get_schain_struct
+
+@pytest.fixture
+def tmp_dir(_schain_name):
+ path = os.path.join(SCHAINS_DIR_PATH, _schain_name)
+ pathlib.Path(path).mkdir()
+ try:
+ yield path
+ finally:
+ shutil.rmtree(path, ignore_errors=True)
@pytest.fixture
@@ -30,7 +46,7 @@ def test_get_sync_agent_ranges(skale, sync_ranges):
ranges = get_sync_agent_ranges(skale)
assert ranges == [
IpRange(start_ip='127.0.0.1', end_ip='127.0.0.2'),
- IpRange(start_ip='127.0.0.5', end_ip='127.0.0.7')
+ IpRange(start_ip='127.0.0.5', end_ip='127.0.0.7'),
]
@@ -51,46 +67,122 @@ def test_is_node_part_of_chain(skale, schain_on_contracts, node_config):
assert not chain_on_node
-def test_run_monitor_for_schain(
- skale,
- skale_ima,
- schain_on_contracts,
- node_config,
- schain_db,
- dutils
-):
- with mock.patch('core.schains.monitor.main.keep_tasks_running') as keep_tasks_running_mock:
- run_monitor_for_schain(
- skale,
- skale_ima,
- node_config,
- get_schain_struct(schain_name=schain_db),
- dutils=dutils,
- once=True
- )
- assert isinstance(keep_tasks_running_mock.call_args[0][0], ThreadPoolExecutor)
- assert isinstance(keep_tasks_running_mock.call_args[0][1][0], Task)
- assert isinstance(keep_tasks_running_mock.call_args[0][1][1], Task)
- assert keep_tasks_running_mock.call_args[0][2] == [None, None]
-
-
-def test_run_monitor_for_schain_left(
- skale,
- skale_ima,
- node_config,
- schain_db,
- dutils
-):
- schain_not_exists = 'not-on-node'
- upsert_schain_record(schain_not_exists)
- with mock.patch('core.schains.monitor.main.is_node_part_of_chain', return_value=False):
- with mock.patch('core.schains.monitor.main.keep_tasks_running') as keep_tasks_running_mock:
- run_monitor_for_schain(
- skale,
- skale_ima,
- node_config,
- get_schain_struct(schain_name=schain_not_exists),
- dutils=dutils,
- once=True
- )
- keep_tasks_running_mock.assert_not_called()
+def test_config_task(skale, skale_ima, schain_db, schain_on_contracts, node_config):
+ stream_version = '2.3.0'
+ config_task = ConfigTask(
+ schain_name=schain_on_contracts,
+ skale=skale,
+ skale_ima=skale_ima,
+ node_config=node_config,
+ stream_version=stream_version,
+ )
+ assert config_task.needed
+ skale_ima.linker.has_schain = mock.Mock(return_value=True)
+
+ def get_monitor_mock(*args, **kwargs):
+ result = mock.MagicMock()
+ result.__name__ = 'TestConfigMonitor'
+ return result
+
+ with mock.patch('core.schains.monitor.main.RegularConfigMonitor', get_monitor_mock):
+ pipeline = config_task.create_pipeline()
+ pipeline()
+
+
+def test_skaled_task(skale, schain_db, schain_on_contracts, node_config, dutils):
+ record = upsert_schain_record(schain_on_contracts)
+ stream_version = '2.3.0'
+ skaled_task = SkaledTask(
+ schain_name=schain_on_contracts,
+ skale=skale,
+ node_config=node_config,
+ stream_version=stream_version,
+ dutils=dutils,
+ )
+ assert not skaled_task.needed
+ assert skaled_task.name == 'skaled'
+ assert skaled_task.start_ts == 0
+ assert skaled_task.stuck_timeout == 3600
+
+ record.set_config_version(stream_version)
+ assert skaled_task.needed
+
+ def get_monitor_mock(*args, **kwargs):
+ result = mock.MagicMock()
+ result.__name__ = 'TestSkaledMonitor'
+ return result
+
+ with mock.patch('core.schains.monitor.main.get_skaled_monitor', get_monitor_mock):
+ with mock.patch('core.schains.monitor.main.notify_checks'):
+ pipeline = skaled_task.create_pipeline()
+ pipeline()
+
+
+def test_execute_tasks(tmp_dir, _schain_name):
+ def run_stuck_pipeline(index: int) -> None:
+ logging.info('Running stuck pipeline %d', index)
+ iterations = 7
+ for i in range(iterations):
+ logging.info('Stuck pipeline %d beat', index)
+ time.sleep(1)
+
+ class StuckedTask(ITask):
+ def __init__(self, index) -> None:
+ self._name = 'stucked-task'
+ self.index = index
+ self._stuck_timeout = 3
+ self._start_ts = 0
+ self._future = Future()
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def future(self) -> Future:
+ return self._future
+
+ @future.setter
+ def future(self, value: Future) -> None:
+ self._future = value
+
+ @property
+ def start_ts(self) -> int:
+ return self._start_ts
+
+ @start_ts.setter
+ def start_ts(self, value: int) -> None:
+ print(f'Updating start_ts {self} {value}')
+ self._start_ts = value
+
+ @property
+ def task_name(self) -> str:
+ return self._task_name
+
+ @property
+ def stuck_timeout(self) -> int:
+ return self._stuck_timeout
+
+ @property
+ def needed(self) -> bool:
+ return True
+
+ def create_pipeline(self) -> Callable:
+ return functools.partial(run_stuck_pipeline, index=self.index)
+
+ class NotNeededTask(StuckedTask):
+ def __init__(self, index: int) -> None:
+ super().__init__(index=index)
+ self._name = 'not-needed-task'
+
+ @property
+ def needed(self) -> bool:
+ return False
+
+ process_report = ProcessReport(name=_schain_name)
+ tasks = [StuckedTask(0), NotNeededTask(1)]
+ execute_tasks(tasks=tasks, process_report=process_report, sleep_interval=1)
+
+ print(tasks[0], tasks[1])
+ assert tasks[0].start_ts == -1
+ assert tasks[1].start_ts == 0
diff --git a/tests/schains/monitor/process_test.py b/tests/schains/monitor/process_test.py
new file mode 100644
index 000000000..d5b495d03
--- /dev/null
+++ b/tests/schains/monitor/process_test.py
@@ -0,0 +1,32 @@
+import os
+import shutil
+import time
+from pathlib import Path
+
+import pytest
+
+from core.schains.process import ProcessReport
+
+from tools.configs.schains import SCHAINS_DIR_PATH
+
+
+@pytest.fixture
+def tmp_dir(_schain_name):
+ path = os.path.join(SCHAINS_DIR_PATH, _schain_name)
+ Path(path).mkdir()
+ try:
+ yield path
+ finally:
+ shutil.rmtree(path, ignore_errors=True)
+
+
+def test_process_report(_schain_name, tmp_dir):
+ report = ProcessReport(_schain_name)
+ with pytest.raises(FileNotFoundError):
+ assert report.ts == 0
+
+ ts = int(time.time())
+ pid = 10
+ report.update(pid=pid, ts=ts)
+ assert report.ts == ts
+ assert report.pid == pid
diff --git a/tests/schains/process_manager_test.py b/tests/schains/process_manager_test.py
new file mode 100644
index 000000000..ab215cc68
--- /dev/null
+++ b/tests/schains/process_manager_test.py
@@ -0,0 +1,108 @@
+import mock
+import logging
+import os
+import pathlib
+import shutil
+import time
+
+import psutil
+import pytest
+
+from core.schains.process import ProcessReport, terminate_process
+from core.schains.process_manager import run_pm_schain
+from tools.configs.schains import SCHAINS_DIR_PATH
+from tests.utils import get_schain_struct
+
+logger = logging.getLogger(__name__)
+
+MAX_ITERATIONS = 100
+
+
+@pytest.fixture
+def tmp_dir(_schain_name):
+ path = os.path.join(SCHAINS_DIR_PATH, _schain_name)
+ pathlib.Path(path).mkdir()
+ try:
+ yield path
+ finally:
+ shutil.rmtree(path, ignore_errors=True)
+
+
+def target_regular_mock(*args, **kwargs):
+ schain_name = args[1].name
+ process_report = ProcessReport(schain_name)
+ process_report.update(os.getpid(), int(time.time()))
+ logger.info('Starting regular test task runner')
+ iterations = 5
+ for i in range(iterations):
+ process_report.ts = int(time.time())
+ logger.info('Regular test task runner beat %s', i)
+ time.sleep(1)
+
+
+def target_stuck_mock(*args, **kwargs):
+ schain_name = ProcessReport(args[1].name)
+ ProcessReport(schain_name).update(os.getpid(), int(time.time()))
+ logger.info('Starting stucked test task runner')
+ iterations = 10000
+ for i in range(iterations):
+ logger.info('Stuck test task runner beat %s', i)
+ time.sleep(1)
+
+
+def wait_for_process_report(process_report):
+ wait_it = 0
+ while wait_it < MAX_ITERATIONS and not process_report.is_exist():
+ time.sleep(0.5)
+ wait_it += 1
+ assert process_report.is_exist()
+
+
+def test_run_pm_schain(tmp_dir, skale, skale_ima, node_config, _schain_name):
+ schain = get_schain_struct(schain_name=_schain_name)
+
+ timeout = 7
+
+ with mock.patch('core.schains.process_manager.start_tasks', target_regular_mock):
+ run_pm_schain(skale, skale_ima, node_config, schain, timeout=timeout)
+
+ process_report = ProcessReport(schain.name)
+ wait_for_process_report(process_report)
+
+ pid = process_report.pid
+
+ try:
+ assert psutil.Process(pid).is_running()
+ start_ts = int(time.time())
+
+ while int(time.time()) - start_ts < 2 * timeout:
+ time.sleep(1)
+ assert psutil.Process(pid).status() not in ('dead', 'stopped')
+ finally:
+ pid = ProcessReport(_schain_name).pid
+ terminate_process(pid)
+
+ old_pid = pid
+ wait_it = 0
+ while wait_it < MAX_ITERATIONS and process_report.pid == old_pid:
+ time.sleep(0.5)
+ wait_it += 1
+
+ with mock.patch('core.schains.process_manager.start_tasks', target_stuck_mock):
+ run_pm_schain(skale, skale_ima, node_config, schain, timeout=timeout)
+
+ start_ts = int(time.time())
+
+ while int(time.time()) - start_ts < 2 * timeout:
+ try:
+ psutil.Process(pid).is_running()
+ except psutil.NoSuchProcess:
+ break
+ time.sleep(1)
+
+ try:
+ with pytest.raises(psutil.NoSuchProcess):
+ psutil.Process(pid).is_running()
+ finally:
+ pid = ProcessReport(_schain_name).pid
+ terminate_process(pid)
diff --git a/tests/schains/task_test.py b/tests/schains/task_test.py
deleted file mode 100644
index f5c574094..000000000
--- a/tests/schains/task_test.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import functools
-import time
-
-import pytest
-
-from core.schains.task import run_tasks, Task
-
-ITERATIONS = 10
-SCHAINS_NUM = 10
-
-
-class StopActionError(Exception):
- pass
-
-
-def action(name):
- for i in range(ITERATIONS):
- time.sleep(2)
- raise StopActionError(f'Stopping {name}')
-
-
-@pytest.mark.skip
-def test_tasks():
- tasks = [
- Task(
- f'test-schain-{i}',
- functools.partial(action, name=f'test-schain-{i}'),
- i
- )
- for i in range(SCHAINS_NUM)
- ]
- run_tasks(tasks=tasks)
- time.sleep(3)
diff --git a/tests/skale-data/config/containers.json b/tests/skale-data/config/containers.json
index 3561d2539..41f3f514e 100644
--- a/tests/skale-data/config/containers.json
+++ b/tests/skale-data/config/containers.json
@@ -1,7 +1,7 @@
{
"schain": {
"name": "skalenetwork/schain",
- "version": "3.16.1",
+ "version": "3.19.0",
"custom_args": {
"ulimits_list": [
{
@@ -31,8 +31,8 @@
},
"ima": {
"name": "skalenetwork/ima",
- "version": "2.0.0-develop.3",
- "new_version": "2.0.0-beta.9",
+ "version": "2.1.0-beta.3",
+ "new_version": "2.1.0",
"custom_args": {},
"args": {
"restart_policy": {
diff --git a/tools/configs/logs.py b/tools/configs/logs.py
index d21c8da41..0d9205d81 100644
--- a/tools/configs/logs.py
+++ b/tools/configs/logs.py
@@ -42,10 +42,10 @@
REMOVED_CONTAINERS_FOLDER_NAME
)
-LOG_FILE_SIZE_MB = 40
+LOG_FILE_SIZE_MB = 100
LOG_FILE_SIZE_BYTES = LOG_FILE_SIZE_MB * 1000000
-LOG_BACKUP_COUNT = 10
+LOG_BACKUP_COUNT = 20
ADMIN_LOG_FORMAT = '[%(asctime)s %(levelname)s][%(process)d][%(processName)s][%(threadName)s] - %(name)s:%(lineno)d - %(message)s' # noqa
API_LOG_FORMAT = '[%(asctime)s] %(process)d %(levelname)s %(url)s %(module)s: %(message)s' # noqa
diff --git a/tools/configs/schains.py b/tools/configs/schains.py
index f08338d45..3341799da 100644
--- a/tools/configs/schains.py
+++ b/tools/configs/schains.py
@@ -54,3 +54,5 @@
RPC_CHECK_TIMEOUT_STEP = 10
MAX_CONSENSUS_STORAGE_INF_VALUE = 1000000000000000000
+
+DKG_TIMEOUT_COEFFICIENT = 2.2
diff --git a/web/models/schain.py b/web/models/schain.py
index 14a023a7a..5f92e45ca 100644
--- a/web/models/schain.py
+++ b/web/models/schain.py
@@ -17,12 +17,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
+import functools
import logging
-import threading
+import time
from datetime import datetime
from peewee import (CharField, DateTimeField,
- IntegrityError, IntegerField, BooleanField)
+ IntegrityError, IntegerField, BooleanField, OperationalError)
from core.schains.dkg.structures import DKGStatus
from web.models.base import BaseModel
@@ -30,10 +31,31 @@
logger = logging.getLogger(__name__)
DEFAULT_CONFIG_VERSION = '0.0.0'
+RETRY_ATTEMPTS = 5
+TIMEOUTS = [2 ** p for p in range(RETRY_ATTEMPTS)]
+
+
+def operational_error_retry(func):
+ @functools.wraps(func)
+ def wrapper(cls, *args, **kwargs):
+ result, error = None, None
+ for i, timeout in enumerate(TIMEOUTS):
+ try:
+ result = func(cls, *args, **kwargs)
+ except OperationalError as e:
+ logger.error('DB operational error. Sleeping %d', timeout, exc_info=e)
+ error = e
+ time.sleep(timeout)
+ else:
+ error = None
+ break
+ if error is not None:
+ raise error
+ return result
+ return wrapper
class SChainRecord(BaseModel):
- _lock = threading.Lock()
name = CharField(unique=True)
added_at = DateTimeField()
dkg_status = IntegerField()
@@ -72,6 +94,7 @@ def add(cls, name):
return (None, err)
@classmethod
+ @operational_error_retry
def get_by_name(cls, name):
return cls.get(cls.name == name)
@@ -109,10 +132,6 @@ def to_dict(cls, record):
'failed_rpc_count': record.failed_rpc_count
}
- def upload(self, *args, **kwargs) -> None:
- with SChainRecord._lock:
- self.save(*args, **kwargs)
-
def dkg_started(self):
self.set_dkg_status(DKGStatus.IN_PROGRESS)
@@ -128,66 +147,66 @@ def dkg_done(self):
def set_dkg_status(self, val: DKGStatus) -> None:
logger.info(f'Changing DKG status for {self.name} to {val.name}')
self.dkg_status = val
- self.upload()
+ self.save()
def set_deleted(self):
self.is_deleted = True
- self.upload()
+ self.save()
def set_first_run(self, val):
logger.info(f'Changing first_run for {self.name} to {val}')
self.first_run = val
- self.upload(only=[SChainRecord.first_run])
+ self.save(only=[SChainRecord.first_run])
def set_backup_run(self, val):
logger.info(f'Changing backup_run for {self.name} to {val}')
self.backup_run = val
- self.upload(only=[SChainRecord.backup_run])
+ self.save(only=[SChainRecord.backup_run])
def set_repair_mode(self, value):
logger.info(f'Changing repair_mode for {self.name} to {value}')
self.repair_mode = value
- self.upload()
+ self.save()
def set_new_schain(self, value):
logger.info(f'Changing new_schain for {self.name} to {value}')
self.new_schain = value
- self.upload()
+ self.save()
def set_needs_reload(self, value):
logger.info(f'Changing needs_reload for {self.name} to {value}')
self.needs_reload = value
- self.upload()
+ self.save()
def set_monitor_last_seen(self, value):
logger.info(f'Changing monitor_last_seen for {self.name} to {value}')
self.monitor_last_seen = value
- self.upload()
+ self.save()
def set_monitor_id(self, value):
logger.info(f'Changing monitor_id for {self.name} to {value}')
self.monitor_id = value
- self.upload()
+ self.save()
def set_config_version(self, value):
logger.info(f'Changing config_version for {self.name} to {value}')
self.config_version = value
- self.upload()
+ self.save()
def set_restart_count(self, value: int) -> None:
logger.info(f'Changing restart count for {self.name} to {value}')
self.restart_count = value
- self.upload()
+ self.save()
def set_failed_rpc_count(self, value: int) -> None:
logger.info(f'Changing failed rpc count for {self.name} to {value}')
self.failed_rpc_count = value
- self.upload()
+ self.save()
def set_snapshot_from(self, value: str) -> None:
logger.info(f'Changing snapshot from for {self.name} to {value}')
self.snapshot_from = value
- self.upload()
+ self.save()
def reset_failed_counters(self) -> None:
logger.info(f'Resetting failed counters for {self.name}')
@@ -205,7 +224,7 @@ def is_dkg_done(self) -> bool:
def set_sync_config_run(self, value):
logger.info(f'Changing sync_config_run for {self.name} to {value}')
self.sync_config_run = value
- self.upload()
+ self.save()
def is_dkg_unsuccessful(self) -> bool:
return self.dkg_status in [