diff --git a/iceprod/core/i3exec.py b/iceprod/core/i3exec.py index b1157d656..6b18687df 100644 --- a/iceprod/core/i3exec.py +++ b/iceprod/core/i3exec.py @@ -2,484 +2,101 @@ The task runner. Run it with `python -m iceprod.core.i3exec`. - -optional arguments: - -h, --help show this help message and exit - -f CFGFILE, --cfgfile CFGFILE - Specify config file - -u URL, --url URL URL of the iceprod server - -p PASSKEY, --passkey PASSKEY - passkey for communcation with iceprod server - --pilot_id PILOTID ID of the pilot (if this is a pilot) - -d, --debug Enable debug actions and logging - --offline Enable offline mode (don't talk with server) - --offline_transfer True/False - Enable offline file transfer - --logfile LOGFILE Specify the logfile to use - --job JOB Index of the job to run - --task TASK Name of the task to run - --gzip-logs gzip the iceprod logs """ -from __future__ import absolute_import, division, print_function - +import argparse +import asyncio import os import logging -import logging.config import time -import socket -from functools import partial +import tempfile +from pathlib import Path import shutil -import threading -import asyncio -import gzip import iceprod -from iceprod.core import constants -import iceprod.core.dataclasses -import iceprod.core.serialization +from iceprod.client_auth import add_auth_to_argparse, create_rest_client +import iceprod.core.config from iceprod.core.defaults import add_default_options import iceprod.core.exe -from iceprod.core.exe_json import ServerComms -import iceprod.core.pilot -import iceprod.core.resources import iceprod.core.logger -def load_config(cfgfile): - """Load a config from file, serialized string, dictionary, etc""" - logger = logging.getLogger('i3exec') - config = None - if isinstance(cfgfile,str): - try: - if os.path.exists(cfgfile): - config = iceprod.core.serialization.serialize_json.load(cfgfile) - else: - config = iceprod.core.serialization.serialize_json.loads(cfgfile) - if not config: - raise Exception('Config not found') - except Exception as e: - logger.critical('Error loading configuration: %s' % str(e)) - raise - elif isinstance(cfgfile,iceprod.core.dataclasses.Job): - config = cfgfile - elif isinstance(cfgfile,dict): - config = iceprod.core.serialization.dict_to_dataclasses(cfgfile) - else: - logger.warning('cfgfile: %r',cfgfile) - raise Exception('cfgfile is not a str or a Job') - return config - - -def main(cfgfile=None, logfile=None, url=None, debug=False, - passkey='', pilot_id=None, offline=False, offline_transfer=False, - gzip_logs=False): - """Main task runner for iceprod""" - # set up logger - if debug: - logl = 'INFO' - else: - logl = 'WARNING' - if logfile: - logf = os.path.abspath(os.path.expandvars(logfile)) - else: - logf = os.path.abspath(os.path.expandvars(constants['stdlog'])) - if gzip_logs: - logf += '.gz' - iceprod.core.logger.set_logger(loglevel=logl, - logfile=logf, - logsize=67108864, - lognum=1) - logging.warning('starting IceProd core') - - if cfgfile is None: - logging.critical('There is no cfgfile') - raise Exception('missing cfgfile') - elif isinstance(cfgfile, str): - config = load_config(cfgfile) - else: - config = cfgfile - logging.info('config: %r',config) - - if not offline: - # if we are not in offline mode, we need a url - if not url: - logging.critical('url missing') - raise Exception('url missing') - - # setup server comms - kwargs = {} - if 'username' in config['options']: - kwargs['username'] = config['options']['username'] - if 'password' in config['options']: - kwargs['password'] = config['options']['password'] - if 'ssl' in config['options'] and config['options']['ssl']: - kwargs.update(config['options']['ssl']) - rpc = ServerComms(url, passkey, None, **kwargs) - - async def run(): - if offline: - logging.info('offline mode') - async for proc in runner(config, debug=debug, offline=offline, - offline_transfer=offline_transfer): - await proc.wait() - elif 'tasks' in config and config['tasks']: - logging.info('online mode - single task') - # tell the server that we are processing this task - if 'task_id' not in config['options']: - raise Exception('config["options"]["task_id"] not specified') - pilot_id2 = None - try: - pilot_id2 = await rpc.create_pilot(queue_host='manual', - host=socket.getfqdn(), - grid_queue_id='1', - queue_version=iceprod.__version__, - version=iceprod.__version__, - tasks=[config['options']['task_id']], - resources={}) - await rpc.processing(config['options']['task_id']) - except Exception: - logging.error('comms error', exc_info=True) - - # set up stdout and stderr - try: - async for proc in runner(config, rpc=rpc, debug=debug): - await proc.wait() - finally: - try: - if pilot_id2: - await rpc.delete_pilot(pilot_id2) - except Exception: - logging.error('comms error', exc_info=True) - else: - logging.info('pilot mode - get many tasks from server') - if 'gridspec' not in config['options']: - logging.critical('gridspec missing') - raise Exception('gridspec missing') - if not pilot_id: - logging.critical('pilot_id missing') - raise Exception('pilot_id missing') - pilot_kwargs = {} - if 'run_timeout' in config['options']: - pilot_kwargs['run_timeout'] = config['options']['run_timeout'] - if 'restrict_site' in config['options']: - pilot_kwargs['restrict_site'] = config['options']['restrict_site'] - async with iceprod.core.pilot.Pilot( - config, rpc=rpc, debug=debug, - runner=partial(runner, rpc=rpc, debug=debug), - pilot_id=pilot_id, **pilot_kwargs) as p: - await p.run() +logger = logging.getLogger('i3exec') - loop = asyncio.get_event_loop() - loop.run_until_complete(run()) - - logging.warning('finished running normally; exiting...') - iceprod.core.logger.remove_handlers() +async def main(): + # get arguments + parser = argparse.ArgumentParser(description='IceProd Core') + parser.add_argument('--log-level', default='info', help='log level') + add_auth_to_argparse(parser) -async def runner(config, rpc=None, debug=False, offline=False, - offline_transfer=False, resources=None): - """Run a config. + real = parser.add_argument_group('Real Dataset', 'Download from IceProd server') + real.add_argument('--dataset-id', help='IceProd dataset id') + real.add_argument('--task-id', help='IceProd task id') - #. Set some default options if not set in configuration. - #. Set up global env based on the configuration. - #. Run tasks - * If a task is specified in the configuration options: + testing = parser.add_argument_group('Testing') + testing.add_argument('--config', help='Specify config file') + testing.add_argument('--task', type=str, help='Name of the task to run') + testing.add_argument('--dataset-num', type=int, default=1, help='Fake dataset number (optional)') + testing.add_argument('--jobs-submitted', type=int, default=1, help='Total number of jobs in this dataset (optional)') + testing.add_argument('--job-num', type=int, default=1, help='Fake job index (optional)') - If the task is specified by name or number, run only that task. - If there is a problem finding the task specified, raise a - critical error. + args = parser.parse_args() + print(args) - * Otherwise, run all tasks in the configuration in the order - they were written. + iceprod.core.logger.set_logger(loglevel=logl) - #. Destroy the global env, uploading and deleting files as needed. - #. Upload the log, error, and output files if specified in options. + if args.dataset_id: + if not args.task_id: + parser.error('task-id is required') + rc = create_rest_client(args) + logger.info('Real dataset mode: dataset %s task %s', args.dataset_id, args.task_id) + task = await iceprod.core.config.Task.load_from_api(args.dataset_id, args.task_id, rc) - Args: - config (`iceprod.core.dataclasses.Job`): Dataset configuration - rpc (:py:class:`iceprod.core.exe_json.ServerComms`): RPC object - debug (bool): (optional) turn on debug logging - offline (bool): (optional) enable offline mode - offline_transfer (bool): (optional) enable/disable offline data transfers - resources (:py:class:`iceprod.core.resources.Resources`): (optional) Resources object - """ - # set logging - if offline: - logger = logging.getLogger('task') - if 'task_id' not in config['options']: - config['options']['task_id'] = 'a' else: - if 'task_id' not in config['options']: - raise Exception('task_id not set in config options') - logger = logging.getLogger(config['options']['task_id']) - if 'debug' not in config['options']: - config['options']['debug'] = debug - if ('debug' in config['options'] and config['options']['debug'] and - 'loglevel' not in config['options']): - config['options']['loglevel'] = 'INFO' - if ('loglevel' in config['options'] and - config['options']['loglevel'].upper() in iceprod.core.logger.setlevel): - try: - iceprod.core.logger.set_log_level(config['options']['loglevel']) - except Exception: - logger.warning('failed to set a new log level', exc_info=True) - - # make sure some basic options are set - add_default_options(config['options']) - if 'offline' not in config['options']: - config['options']['offline'] = offline - if 'offline_transfer' not in config['options']: - config['options']['offline_transfer'] = offline_transfer - if 'subprocess_dir' not in config['options']: - config['options']['subprocess_dir'] = os.getcwd() - if 'task_temp' not in config['options']: - config['options']['task_temp'] = 'file:'+os.path.join(config['options']['subprocess_dir'],'task_temp') - if 'tray_temp' not in config['options']: - config['options']['tray_temp'] = 'file:'+os.path.join(config['options']['subprocess_dir'],'tray_temp') - if 'local_temp' not in config['options']: - config['options']['local_temp'] = os.path.join(config['options']['subprocess_dir'],'local_temp') - if 'credentials_dir' not in config['options']: - config['options']['credentials_dir'] = os.path.join(os.getcwd(), 'iceprod_credentials') - if 'stillrunninginterval' not in config['options']: - config['options']['stillrunninginterval'] = 60 - if 'upload' not in config['options']: - config['options']['upload'] = 'logging' - - if not config['steering']: - # make sure steering exists in the config - config['steering'] = iceprod.core.dataclasses.Steering() - - resource_thread = None - if not resources: - try: - import psutil - except ImportError: - resources = None - else: - # track resource usage in separate thread - resource_stop = False - resources = iceprod.core.resources.Resources(debug=debug) - config['options']['resources'] = resources.claim(config['options']['task_id']) - resources.register_process(config['options']['task_id'], - psutil.Process(), os.getcwd()) - - def track(): - while not resource_stop: - resources.check_claims() - time.sleep(1) - resource_thread = threading.Thread(target=track) - resource_thread.start() - - # make exe Config - cfg = iceprod.core.exe.Config(config=config, rpc=rpc, logger=logger) - - # set up global env, based on config['options'] and config.steering - env_opts = cfg.parseObject(config['options'], {}) - stats = {} - try: - try: - # keep track of the start time - start_time = time.time() - stats = {} - - async with iceprod.core.exe.SetupEnv(cfg, config['steering'], {'options':env_opts}, logger=logger) as env: - logger.warning("config options: %r",config['options']) - - # find tasks to run - if 'task' in config['options']: - logger.warning('task specified: %r',config['options']['task']) - # run only this task name or number - name = config['options']['task'] - if isinstance(name, iceprod.core.dataclasses.String) and name.isdigit(): - name = int(name) - if isinstance(name, iceprod.core.dataclasses.String): - # find task by name - for task in config['tasks']: - if task['name'] == name: - async for proc in iceprod.core.exe.runtask(cfg, env, task, logger=logger): - yield proc - break - else: - logger.critical('cannot find task named %r', name) - raise Exception('cannot find specified task') - elif isinstance(name, int): - # find task by index - if name >= 0 and name < len(config['tasks']): - async for proc in iceprod.core.exe.runtask(cfg, env, config['tasks'][name], logger=logger): - yield proc - else: - logger.critical('cannot find task index %d', name) - raise Exception('cannot find specified task') - - else: - logger.critical('task specified in options is %r, but no task found', - name) - raise Exception('cannot find specified task') - stats = env['stats'] - elif offline: - # run all tasks in order - for task in config['tasks']: - async for proc in iceprod.core.exe.runtask(cfg, env, task): - yield proc - else: - raise Exception('task to run not specified') - - if (not offline) and 'task' in config['options']: - # finish task - logger.warning('task finishing') - await rpc.finish_task( - config['options']['task_id'], - dataset_id=config['options']['dataset_id'], - stats=stats, start_time=start_time, - resources=resources.get_final(config['options']['task_id']) if resources else None, - site=resources.site if resources else None, - ) - - except Exception as e: - logger.error('task failed, exiting without running completion steps.', - exc_info=True) - # set task status on server - if not offline: - try: - await rpc.task_error( - config['options']['task_id'], - dataset_id=config['options']['dataset_id'], - stats=stats, start_time=start_time, - reason=str(e), - resources=resources.get_final(config['options']['task_id']) if resources else None, - site=resources.site if resources else None, - ) - except Exception as e: - logger.error(e) - # forcibly turn on logging, so we can see the error - config['options']['upload'] = 'logging' - raise - - finally: - # check resources - if resource_thread: - resource_stop = True - resource_thread.join() - print('Resources:') - r = resources.get_final(config['options']['task_id']) - if not r: - print(' None') - else: - for k in r: - print(' {}: {:.2f}'.format(k,r[k])) - # upload log files to server - try: - if (not offline) and 'upload' in config['options']: - logger.warning('uploading logfiles') - if isinstance(config['options']['upload'], - iceprod.core.dataclasses.String): - upload = config['options']['upload'].lower().split('|') - elif isinstance(config['options']['upload'],(tuple,list)): - upload = [x.lower() for x in config['options']['upload']] - else: - raise Exception('upload config is not a valid type') - errfile = constants['stderr'] - outfile = constants['stdout'] - if 'subprocess_dir' in config['options'] and config['options']['subprocess_dir']: - subdir = config['options']['subprocess_dir'] - errfile = os.path.join(subdir, errfile) - outfile = os.path.join(subdir, outfile) - for up in upload: - if up.startswith('logging'): - # upload err,log,out files - await rpc.uploadLog(task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - await rpc.uploadErr(filename=errfile, - task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - await rpc.uploadOut(filename=outfile, - task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - break - elif up.startswith('log'): - # upload log files - await rpc.uploadLog(task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - elif up.startswith('err'): - # upload err files - await rpc.uploadErr(filename=errfile, - task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - elif up.startswith('out'): - # upload out files - await rpc.uploadOut(filename=outfile, - task_id=config['options']['task_id'], - dataset_id=config['options']['dataset_id']) - except Exception: - logger.error('failed when uploading logging info',exc_info=True) - - logger.warning('finished without error') + if not args.config: + parser.error('config is required') + if not args.task: + parser.error('task is required') + + logger.info('Testing mode: dataset %d job %d task %s', args.dataset_num, args.job_num, args.task) + with open(args.config) as f: + cfg = json.load(f) + + d = iceprod.core.config.Dataset( + dataset_id='datasetid', + dataset_num=args.dataset_num, + jobs_submitted=args.jobs_submitted, + tasks_submitted=args.jobs_submitted*len(cfg['tasks']), + tasks_per_job=len(cfg['tasks']), + status='processing', + priority=0, + group='group', + user='user', + debug=True, + config=cfg + ) + task = iceprod.core.config.Task( + dataset=d, + job=iceprod.core.config.Job(d, '', args.job_index, 'processing'), + task_id='taskid', + task_index=-1, + name=args.task, + depends=[]m + requirements={}m + status='processing', + site='site', + stats={} + ) + + task.dataset.fill_defaults() + task.dataset.validate() + + ws = iceprod.core.exe.WriteToScript(task, workdir=Path.cwd(), logger=logger) + scriptpath = await ws.convert() + logger.info('running script %s', scriptpath) + subprocess.run([scriptpath], check=True) if __name__ == '__main__': - # get arguments - import argparse - parser = argparse.ArgumentParser(description='IceProd Core') - parser.add_argument('-f','--cfgfile', type=str, - help='Specify config file') - parser.add_argument('-u','--url', type=str, - help='URL of the iceprod server') - parser.add_argument('-p','--passkey', type=str, - help='passkey for communcation with iceprod server') - parser.add_argument('--pilot_id', type=str, default=None, - help='ID of the pilot (if this is a pilot)') - parser.add_argument('-d','--debug', action='store_true', default=False, - help='Enable debug actions and logging') - parser.add_argument('--offline', action='store_true', default=False, - help='Enable offline mode (don\'t talk with server)') - parser.add_argument('--offline_transfer', type=bool, default=False, - help='Enable/disable file transfer during offline mode') - parser.add_argument('--logfile', type=str, default=None, - help='Specify the logfile to use') - parser.add_argument('--dataset', type=int, default=None, - help='Dataset number') - parser.add_argument('--job', type=int, default=None, - help='Index of the job to run') - parser.add_argument('--jobs_submitted', type=int, default=None, - help='Total number of jobs in this dataset') - parser.add_argument('--task', type=str, default=None, - help='Name of the task to run') - parser.add_argument('--gzip-logs', dest='gzip_logs', action='store_true', - help='gzip the iceprod logs') - - args = vars(parser.parse_args()) - print(args) - - # check cfgfile - if args['cfgfile'] is not None and not os.path.isfile(args['cfgfile']): - if os.path.isfile(os.path.join(os.getcwd(),args['cfgfile'])): - args['cfgfile'] = os.path.join(os.getcwd(),args['cfgfile']) - else: - args['cfgfile'] = None - - options = {k: args.pop(k) for k in ('dataset','job','jobs_submitted','task')} - if not options['jobs_submitted'] and options['job']: - options['jobs_submitted'] = options['job']+1 - options['debug'] = args['debug'] - if args['cfgfile']: - cfgfile = load_config(args['cfgfile']) - for k in options: - if options[k] is not None and k not in cfgfile['options']: - cfgfile['options'][k] = options[k] - args['cfgfile'] = cfgfile - - # start iceprod - try: - main(**args) - finally: - if args['gzip_logs']: - # compress stdout and stderr - for filename in ('stdout', 'stderr'): - if os.path.exists(constants[filename]): - with open(constants[filename], 'rb') as f_in: - with gzip.open(constants[filename]+'.gz', 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - else: - with gzip.open(constants[filename]+'.gz', 'wb') as f_out: - pass + asyncio.run(main())