From 993c9ec1eb55391f1928e0742d86b359f4e60bc2 Mon Sep 17 00:00:00 2001 From: badrogger Date: Fri, 8 Nov 2024 18:36:25 +0000 Subject: [PATCH] Fix empty error logs --- core/schains/monitor/main.py | 53 ++++++++++++++++++------------ core/schains/monitor/tasks.py | 29 ++++++++++------ tests/schains/monitor/main_test.py | 10 +++--- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/core/schains/monitor/main.py b/core/schains/monitor/main.py index 54ffbefe..5bf9b5db 100644 --- a/core/schains/monitor/main.py +++ b/core/schains/monitor/main.py @@ -66,8 +66,8 @@ def run_config_pipeline( node_config: NodeConfig, stream_version: str, ) -> None: + logger.info('Gathering initial skale manager data') schain = skale.schains.get_by_name(schain_name) - schain_record = SChainRecord.get_by_name(schain_name) rotation_data = skale.node_rotation.get_rotation(schain_name) allowed_ranges = get_sync_agent_ranges(skale) ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(schain_name) @@ -75,10 +75,14 @@ def run_config_pipeline( last_dkg_successful = skale.dkg.is_last_dkg_successful(group_index) current_nodes = get_current_nodes(skale, schain_name) + logger.info('Initing schain record') + schain_record = SChainRecord.get_by_name(schain_name) + estate = ExternalState( ima_linked=ima_linked, chain_id=skale_ima.web3.eth.chain_id, ranges=allowed_ranges ) econfig = ExternalConfig(schain_name) + logger.info('Initing config checks') config_checks = ConfigChecks( schain_name=schain_name, node_id=node_config.id, @@ -91,6 +95,7 @@ def run_config_pipeline( estate=estate, ) + logger.info('Initing config action manager') config_am = ConfigActionManager( skale=skale, schain=schain, @@ -103,8 +108,9 @@ def run_config_pipeline( econfig=econfig, ) + logger.info('Gathering config status') status = config_checks.get_all(log=False, expose=True) - logger.info('Config checks: %s', status) + logger.info('Config status: %s', status) if SYNC_NODE: logger.info( @@ -128,6 +134,7 @@ def run_skaled_pipeline( schain_name: str, skale: Skale, node_config: NodeConfig, dutils: DockerUtils ) -> None: schain = skale.schains.get_by_name(schain_name) + logger.info('Initing schain record') schain_record = SChainRecord.get_by_name(schain_name) logger.info('Record: %s', SChainRecord.to_dict(schain_record)) @@ -135,7 +142,7 @@ def run_skaled_pipeline( dutils = dutils or DockerUtils() rc = get_default_rule_controller(name=schain_name) - logger.info('Initing skaled checks manager') + logger.info('Initing skaled checks') skaled_checks = SkaledChecks( schain_name=schain.name, schain_record=schain_record, @@ -159,7 +166,7 @@ def run_skaled_pipeline( econfig=ExternalConfig(schain_name), dutils=dutils, ) - logger.info('Fetching skaled checks') + logger.info('Gathering skaled status') check_status = skaled_checks.get_all(log=False, expose=True) logger.info('Get automatic repair option') automatic_repair = get_automatic_repair_option() @@ -237,14 +244,16 @@ def needed(self) -> bool: not schain_record.sync_config_run or not schain_record.first_run ) - def create_pipeline(self) -> Callable: - return functools.partial( - run_skaled_pipeline, - schain_name=self.schain_name, - skale=self.skale, - node_config=self.node_config, - dutils=self.dutils, - ) + def run(self) -> None: + try: + run_skaled_pipeline( + schain_name=self.schain_name, + skale=self.skale, + node_config=self.node_config, + dutils=self.dutils, + ) + except Exception: + logger.exception('Task %s failed', self.name) class ConfigTask(ITask): @@ -296,15 +305,17 @@ def start_ts(self, value: int) -> None: def needed(self) -> bool: return SYNC_NODE or is_node_part_of_chain(self.skale, self.schain_name, self.node_config.id) - def create_pipeline(self) -> Callable: - return functools.partial( - run_config_pipeline, - schain_name=self.schain_name, - skale=self.skale, - skale_ima=self.skale_ima, - node_config=self.node_config, - stream_version=self.stream_version, - ) + def run(self) -> None: + try: + run_config_pipeline( + schain_name=self.schain_name, + skale=self.skale, + skale_ima=self.skale_ima, + node_config=self.node_config, + stream_version=self.stream_version, + ) + except Exception: + logger.exception('Task %s failed', self.name) def start_tasks( diff --git a/core/schains/monitor/tasks.py b/core/schains/monitor/tasks.py index 7fcc86f8..079242b7 100644 --- a/core/schains/monitor/tasks.py +++ b/core/schains/monitor/tasks.py @@ -25,7 +25,7 @@ def stuck_timeout(self) -> int: pass @abc.abstractmethod - def create_pipeline(self) -> Callable: + def run(self) -> None: pass @property @@ -58,29 +58,38 @@ def execute_tasks( process_report: ProcessReport, sleep_interval: int = SLEEP_INTERVAL_SECONDS, ) -> None: - logger.info('Running tasks %s', tasks) - with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix='T') as executor: + logger.info("Running tasks %s", tasks) + with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix="T") as executor: stucked = [] while True: for index, task in enumerate(tasks): + logger.info( + "Status of %s, running: %s needed: %s stucked: %s", + task.name, + task.future.running(), + task.needed, + len(stucked), + ) if not task.future.running() and task.needed and len(stucked) == 0: + if task.future.done(): + logger.info('Done') + logger.info('Result %s', task.future.result()) task.start_ts = int(time.time()) - logger.info('Starting task %s at %d', task.name, task.start_ts) - pipeline = task.create_pipeline() - task.future = executor.submit(pipeline) + logger.info("Starting task %s at %d", task.name, task.start_ts) + task.future = executor.submit(task.run) elif task.future.running(): if int(time.time()) - task.start_ts > task.stuck_timeout: - logger.info('Canceling future for %s', task.name) + logger.info("Canceling future for %s", task.name) canceled = task.future.cancel() if not canceled: - logger.warning('Stuck detected for job %s', task.name) + logger.warning("Stuck detected for job %s", task.name) task.start_ts = -1 stucked.append(task.name) time.sleep(sleep_interval) if len(stucked) > 0: - logger.info('Sleeping before subverting execution') + logger.info("Sleeping before subverting execution") executor.shutdown(wait=False) - logger.info('Subverting execution. Stucked %s', stucked) + logger.info("Subverting execution. Stucked %s", stucked) process_report.ts = 0 break process_report.ts = int(time.time()) diff --git a/tests/schains/monitor/main_test.py b/tests/schains/monitor/main_test.py index 242fbe43..aa15a51c 100644 --- a/tests/schains/monitor/main_test.py +++ b/tests/schains/monitor/main_test.py @@ -85,8 +85,7 @@ def get_monitor_mock(*args, **kwargs): return result with mock.patch('core.schains.monitor.main.RegularConfigMonitor', get_monitor_mock): - pipeline = config_task.create_pipeline() - pipeline() + config_task.run() def test_skaled_task(skale, schain_db, schain_on_contracts, node_config, dutils): @@ -114,8 +113,7 @@ def get_monitor_mock(*args, **kwargs): with mock.patch('core.schains.monitor.main.get_skaled_monitor', get_monitor_mock): with mock.patch('core.schains.monitor.main.notify_checks'): - pipeline = skaled_task.create_pipeline() - pipeline() + skaled_task.run() def test_execute_tasks(tmp_dir, _schain_name): @@ -167,8 +165,8 @@ def stuck_timeout(self) -> int: def needed(self) -> bool: return True - def create_pipeline(self) -> Callable: - return functools.partial(run_stuck_pipeline, index=self.index) + def run(self) -> None: + run_stuck_pipeline(index=self.index) class NotNeededTask(StuckedTask): def __init__(self, index: int) -> None: