diff --git a/iceprod/scheduled_tasks/removed_tasks.py b/iceprod/scheduled_tasks/removed_tasks.py deleted file mode 100644 index d457fe398..000000000 --- a/iceprod/scheduled_tasks/removed_tasks.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Update pilots when tasks change state via a user. -""" - -import argparse -import asyncio -import logging - -import requests.exceptions - -from iceprod.client_auth import add_auth_to_argparse, create_rest_client - -logger = logging.getLogger('removed_tasks') - - -async def run(rest_client, debug=False): - """ - Actual runtime / loop. - - Args: - rest_client (:py:class:`iceprod.core.rest_client.Client`): rest client - debug (bool): debug flag to propagate exceptions - """ - try: - async def test_pilot(pilot): - new_tasks = [] - for task_id in pilot['tasks']: - ret = await rest_client.request('GET', f'/tasks/{task_id}') - if ret['status'] == 'processing': - new_tasks.append(task_id) - if new_tasks != pilot['tasks']: - args = {'tasks': new_tasks} - try: - await rest_client.request('PATCH', f'/pilots/{pilot["pilot_id"]}', args) - except requests.exceptions.HTTPError as e: - if e.response.status_code != 404: - raise - - awaitables = set() - pilots = await rest_client.request('GET', '/pilots?keys=pilot_id|tasks') - for p in pilots.values(): - if 'tasks' in p and p['tasks']: - awaitables.add(asyncio.create_task(test_pilot(p))) - if len(awaitables) >= 40: - done,pending = await asyncio.wait(awaitables, return_when=asyncio.FIRST_COMPLETED) - awaitables = pending - for fut in done: - await fut - for fut in asyncio.as_completed(awaitables): - await fut - - except Exception: - logger.error('error updating pilot for removed tasks', exc_info=True) - if debug: - raise - - -def main(): - parser = argparse.ArgumentParser(description='run a scheduled task once') - add_auth_to_argparse(parser) - parser.add_argument('--log-level', default='info', help='log level') - parser.add_argument('--debug', default=False, action='store_true', help='debug enabled') - - args = parser.parse_args() - - logformat = '%(asctime)s %(levelname)s %(name)s %(module)s:%(lineno)s - %(message)s' - logging.basicConfig(format=logformat, level=getattr(logging, args.log_level.upper())) - - rest_client = create_rest_client(args) - asyncio.run(run(rest_client, debug=args.debug)) - - -if __name__ == '__main__': - main() diff --git a/iceprod/scheduled_tasks/reset_tasks.py b/iceprod/scheduled_tasks/reset_tasks.py deleted file mode 100644 index c7724a4a8..000000000 --- a/iceprod/scheduled_tasks/reset_tasks.py +++ /dev/null @@ -1,93 +0,0 @@ -""" -Move tasks from reset to waiting/failed/suspended. -""" - -import argparse -import asyncio -import logging - -from iceprod.client_auth import add_auth_to_argparse, create_rest_client - -logger = logging.getLogger('reset_tasks') - - -async def reset_dataset(dataset_id, rest_client=None, debug=False): - try: - dataset = await rest_client.request('GET', '/datasets/{}'.format(dataset_id)) - tasks = await rest_client.request('GET', '/datasets/{}/task_summaries/status'.format(dataset_id)) - if 'reset' in tasks: - logger.info('dataset %s reset tasks: %s', dataset_id, tasks['reset']) - for task_id in tasks['reset']: - try: - task = await rest_client.request('GET', '/tasks/{}'.format(task_id)) - status = 'waiting' - if dataset['debug']: - status = 'suspended' - elif 'failures' in task and task['failures'] > 10: - status = 'failed' - args = {'status': status} - await rest_client.request('PUT', '/datasets/{}/tasks/{}/status'.format(dataset_id,task_id), args) - except Exception: - logger.error('error resetting task %s', task_id, exc_info=True) - if debug: - raise - except Exception: - logger.error('error resetting tasks in dataset %s', dataset_id, exc_info=True) - if debug: - raise - - -async def run(rest_client, debug=False): - """ - Actual runtime / loop. - - Args: - rest_client (:py:class:`iceprod.core.rest_client.Client`): rest client - debug (bool): debug flag to propagate exceptions - """ - try: - datasets = await rest_client.request('GET', '/dataset_summaries/status') - dataset_ids = [] - if 'processing' in datasets: - dataset_ids.extend(datasets['processing']) - if 'truncated' in datasets: - dataset_ids.extend(datasets['truncated']) - awaitables = set() - for dataset_id in dataset_ids: - fut = asyncio.create_task(reset_dataset(dataset_id, rest_client=rest_client, debug=debug)) - awaitables.add(fut) - if len(awaitables) >= 20: - done, pending = await asyncio.wait(awaitables, return_when=asyncio.FIRST_COMPLETED) - for fut in done: - await fut - awaitables = pending - for fut in asyncio.as_completed(awaitables): - await fut - except Exception: - logger.error('error resetting non-active tasks', exc_info=True) - if debug: - raise - - -def main(): - parser = argparse.ArgumentParser(description='run a scheduled task once') - add_auth_to_argparse(parser) - parser.add_argument('--dataset-id', default=None, help='dataset id to reset') - parser.add_argument('--log-level', default='info', help='log level') - parser.add_argument('--debug', default=False, action='store_true', help='debug enabled') - - args = parser.parse_args() - - logformat = '%(asctime)s %(levelname)s %(name)s %(module)s:%(lineno)s - %(message)s' - logging.basicConfig(format=logformat, level=getattr(logging, args.log_level.upper())) - - rest_client = create_rest_client(args) - - if args.dataset_id: - asyncio.run(reset_dataset(args.dataset_id, rest_client, debug=args.debug)) - else: - asyncio.run(run(rest_client, debug=args.debug)) - - -if __name__ == '__main__': - main() diff --git a/iceprod/scheduled_tasks/update_task_priority.py b/iceprod/scheduled_tasks/update_task_priority.py index a07a828de..b056e2c70 100644 --- a/iceprod/scheduled_tasks/update_task_priority.py +++ b/iceprod/scheduled_tasks/update_task_priority.py @@ -24,7 +24,7 @@ async def run(rest_client, dataset_id=None, debug=False): prio = Priority(rest_client) try: args = { - 'status': 'waiting|queued|processing|reset', + 'status': 'idle|waiting', 'keys': 'task_id|depends|dataset_id', } if dataset_id: diff --git a/iceprod/server/plugins/condor.py b/iceprod/server/plugins/condor.py index 26f530b23..6b9d9c6d4 100644 --- a/iceprod/server/plugins/condor.py +++ b/iceprod/server/plugins/condor.py @@ -70,6 +70,7 @@ class JobStatus(enum.Enum): RESET_REASONS = [ + '_condor_stdout: (errno 2) No such file', 'sigterm', 'killed', 'transfer input files failure', diff --git a/iceprod/server/server.py b/iceprod/server/server.py index d09bd1063..c36e8153d 100644 --- a/iceprod/server/server.py +++ b/iceprod/server/server.py @@ -12,6 +12,7 @@ import os import sys +from iceprod import __version__ as version_string from iceprod.core.logger import set_log_level from iceprod.server.config import IceProdConfig from iceprod.server.queue import Queue @@ -34,6 +35,7 @@ def __init__(self, config_params=None, outfile=None, errfile=None): self.queue = Queue(self.cfg) set_log_level(self.cfg['logging']['level']) + logger.error('IceProd Server - version %s', version_string) async def rotate_logs(self): current_date = datetime.utcnow() diff --git a/tests/scheduled_tasks/reset_tasks_test.py b/tests/scheduled_tasks/reset_tasks_test.py deleted file mode 100644 index 74ed4786c..000000000 --- a/tests/scheduled_tasks/reset_tasks_test.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -Test script for scheduled_tasks/reset_tasks -""" - -import logging -from unittest.mock import AsyncMock, MagicMock - -import pytest -from iceprod.scheduled_tasks import reset_tasks - -logger = logging.getLogger('scheduled_tasks_reset_tasks_test') - - -async def test_200_run(): - rc = MagicMock() - pilots = {} - dataset_summaries = {'processing':['foo']} - tasks = {} - task = {} - async def client(method, url, args=None): - logger.info('REST: %s, %s', method, url) - if url.startswith('/dataset_summaries'): - return dataset_summaries - elif url.startswith('/datasets/foo/task_summaries'): - return tasks - elif url == '/datasets/foo': - return {'debug':False} - elif url == '/tasks/bar': - return task - elif url == '/datasets/foo/tasks/bar/status' and method == 'PUT': - client.called = True - return {} - else: - raise Exception() - client.called = False - rc.request = client - - await reset_tasks.run(rc, debug=True) - assert not client.called - - tasks['reset'] = ['bar'] - await reset_tasks.run(rc, debug=True) - assert client.called - - client.called = False - del dataset_summaries['processing'] - dataset_summaries['truncated'] = ['foo'] - await reset_tasks.run(rc, debug=True) - assert client.called - - -async def test_201_run(): - rc = MagicMock() - pilots = {'a':{}} - # try tasks error - async def client(method, url, args=None): - logger.info('REST: %s, %s', method, url) - if url.startswith('/dataset_summaries'): - return {'processing':['foo']} - else: - raise Exception() - rc.request = client - with pytest.raises(Exception): - await reset_tasks.run(rc, debug=True) - - # check it normally hides the error - await reset_tasks.run(rc, debug=False) - - # try dataset level error - rc.request = AsyncMock(side_effect=Exception()) - with pytest.raises(Exception): - await reset_tasks.run(rc, debug=True) - - # check it normally hides the error - await reset_tasks.run(rc, debug=False)