Skip to content

Commit

Permalink
Fix empty error logs
Browse files Browse the repository at this point in the history
  • Loading branch information
badrogger committed Nov 8, 2024
1 parent 9e7ea8f commit 993c9ec
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
53 changes: 32 additions & 21 deletions core/schains/monitor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ def run_config_pipeline(
node_config: NodeConfig,
stream_version: str,
) -> None:
logger.info('Gathering initial skale manager data')
schain = skale.schains.get_by_name(schain_name)
schain_record = SChainRecord.get_by_name(schain_name)
rotation_data = skale.node_rotation.get_rotation(schain_name)
allowed_ranges = get_sync_agent_ranges(skale)
ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(schain_name)
group_index = skale.schains.name_to_group_id(schain_name)
last_dkg_successful = skale.dkg.is_last_dkg_successful(group_index)
current_nodes = get_current_nodes(skale, schain_name)

logger.info('Initing schain record')
schain_record = SChainRecord.get_by_name(schain_name)

estate = ExternalState(
ima_linked=ima_linked, chain_id=skale_ima.web3.eth.chain_id, ranges=allowed_ranges
)
econfig = ExternalConfig(schain_name)
logger.info('Initing config checks')
config_checks = ConfigChecks(
schain_name=schain_name,
node_id=node_config.id,
Expand All @@ -91,6 +95,7 @@ def run_config_pipeline(
estate=estate,
)

logger.info('Initing config action manager')
config_am = ConfigActionManager(
skale=skale,
schain=schain,
Expand All @@ -103,8 +108,9 @@ def run_config_pipeline(
econfig=econfig,
)

logger.info('Gathering config status')
status = config_checks.get_all(log=False, expose=True)
logger.info('Config checks: %s', status)
logger.info('Config status: %s', status)

if SYNC_NODE:
logger.info(
Expand All @@ -128,14 +134,15 @@ def run_skaled_pipeline(
schain_name: str, skale: Skale, node_config: NodeConfig, dutils: DockerUtils
) -> None:
schain = skale.schains.get_by_name(schain_name)
logger.info('Initing schain record')
schain_record = SChainRecord.get_by_name(schain_name)

logger.info('Record: %s', SChainRecord.to_dict(schain_record))

dutils = dutils or DockerUtils()

rc = get_default_rule_controller(name=schain_name)
logger.info('Initing skaled checks manager')
logger.info('Initing skaled checks')
skaled_checks = SkaledChecks(
schain_name=schain.name,
schain_record=schain_record,
Expand All @@ -159,7 +166,7 @@ def run_skaled_pipeline(
econfig=ExternalConfig(schain_name),
dutils=dutils,
)
logger.info('Fetching skaled checks')
logger.info('Gathering skaled status')
check_status = skaled_checks.get_all(log=False, expose=True)
logger.info('Get automatic repair option')
automatic_repair = get_automatic_repair_option()
Expand Down Expand Up @@ -237,14 +244,16 @@ def needed(self) -> bool:
not schain_record.sync_config_run or not schain_record.first_run
)

def create_pipeline(self) -> Callable:
return functools.partial(
run_skaled_pipeline,
schain_name=self.schain_name,
skale=self.skale,
node_config=self.node_config,
dutils=self.dutils,
)
def run(self) -> None:
try:
run_skaled_pipeline(
schain_name=self.schain_name,
skale=self.skale,
node_config=self.node_config,
dutils=self.dutils,
)
except Exception:
logger.exception('Task %s failed', self.name)


class ConfigTask(ITask):
Expand Down Expand Up @@ -296,15 +305,17 @@ def start_ts(self, value: int) -> None:
def needed(self) -> bool:
return SYNC_NODE or is_node_part_of_chain(self.skale, self.schain_name, self.node_config.id)

def create_pipeline(self) -> Callable:
return functools.partial(
run_config_pipeline,
schain_name=self.schain_name,
skale=self.skale,
skale_ima=self.skale_ima,
node_config=self.node_config,
stream_version=self.stream_version,
)
def run(self) -> None:
try:
run_config_pipeline(
schain_name=self.schain_name,
skale=self.skale,
skale_ima=self.skale_ima,
node_config=self.node_config,
stream_version=self.stream_version,
)
except Exception:
logger.exception('Task %s failed', self.name)


def start_tasks(
Expand Down
29 changes: 19 additions & 10 deletions core/schains/monitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def stuck_timeout(self) -> int:
pass

@abc.abstractmethod
def create_pipeline(self) -> Callable:
def run(self) -> None:
pass

@property
Expand Down Expand Up @@ -58,29 +58,38 @@ def execute_tasks(
process_report: ProcessReport,
sleep_interval: int = SLEEP_INTERVAL_SECONDS,
) -> None:
logger.info('Running tasks %s', tasks)
with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix='T') as executor:
logger.info("Running tasks %s", tasks)
with ThreadPoolExecutor(max_workers=len(tasks), thread_name_prefix="T") as executor:
stucked = []
while True:
for index, task in enumerate(tasks):
logger.info(
"Status of %s, running: %s needed: %s stucked: %s",
task.name,
task.future.running(),
task.needed,
len(stucked),
)
if not task.future.running() and task.needed and len(stucked) == 0:
if task.future.done():
logger.info('Done')
logger.info('Result %s', task.future.result())
task.start_ts = int(time.time())
logger.info('Starting task %s at %d', task.name, task.start_ts)
pipeline = task.create_pipeline()
task.future = executor.submit(pipeline)
logger.info("Starting task %s at %d", task.name, task.start_ts)
task.future = executor.submit(task.run)
elif task.future.running():
if int(time.time()) - task.start_ts > task.stuck_timeout:
logger.info('Canceling future for %s', task.name)
logger.info("Canceling future for %s", task.name)
canceled = task.future.cancel()
if not canceled:
logger.warning('Stuck detected for job %s', task.name)
logger.warning("Stuck detected for job %s", task.name)
task.start_ts = -1
stucked.append(task.name)
time.sleep(sleep_interval)
if len(stucked) > 0:
logger.info('Sleeping before subverting execution')
logger.info("Sleeping before subverting execution")
executor.shutdown(wait=False)
logger.info('Subverting execution. Stucked %s', stucked)
logger.info("Subverting execution. Stucked %s", stucked)
process_report.ts = 0
break
process_report.ts = int(time.time())
10 changes: 4 additions & 6 deletions tests/schains/monitor/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ def get_monitor_mock(*args, **kwargs):
return result

with mock.patch('core.schains.monitor.main.RegularConfigMonitor', get_monitor_mock):
pipeline = config_task.create_pipeline()
pipeline()
config_task.run()


def test_skaled_task(skale, schain_db, schain_on_contracts, node_config, dutils):
Expand Down Expand Up @@ -114,8 +113,7 @@ def get_monitor_mock(*args, **kwargs):

with mock.patch('core.schains.monitor.main.get_skaled_monitor', get_monitor_mock):
with mock.patch('core.schains.monitor.main.notify_checks'):
pipeline = skaled_task.create_pipeline()
pipeline()
skaled_task.run()


def test_execute_tasks(tmp_dir, _schain_name):
Expand Down Expand Up @@ -167,8 +165,8 @@ def stuck_timeout(self) -> int:
def needed(self) -> bool:
return True

def create_pipeline(self) -> Callable:
return functools.partial(run_stuck_pipeline, index=self.index)
def run(self) -> None:
run_stuck_pipeline(index=self.index)

class NotNeededTask(StuckedTask):
def __init__(self, index: int) -> None:
Expand Down

0 comments on commit 993c9ec

Please sign in to comment.