From d23a9d72b21433eab9ea8e78aa747d45ac132dfb Mon Sep 17 00:00:00 2001 From: Liam Marshall Date: Thu, 15 Jun 2017 12:18:35 -0500 Subject: [PATCH 1/3] App.py: strip all try/except blocks out --- next/apps/App.py | 261 ++++++++++++++++++++--------------------------- 1 file changed, 113 insertions(+), 148 deletions(-) diff --git a/next/apps/App.py b/next/apps/App.py index a68a8623..92123623 100644 --- a/next/apps/App.py +++ b/next/apps/App.py @@ -69,10 +69,10 @@ def alg_wrapper(alg_args={}): def init_alg(self, exp_uid, algorithm, alg_args): butler = Butler(self.app_id, exp_uid, self.myApp.TargetManager, self.butler.db, self.butler.ell, algorithm['alg_label'], algorithm['alg_id']) alg = utils.get_app_alg(self.app_id, algorithm['alg_id']) - + if 'args' in self.algs_reference_dict['initExp']: alg_args = verifier.verify(alg_args, self.algs_reference_dict['initExp']['args']) - + # I got rid of a timeit function here; it wasn't handling the # argument unpacking correctly? --Scott, 2016-3-7 # TODO: put dt back in and change log_entry to relfect that @@ -80,7 +80,7 @@ def init_alg(self, exp_uid, algorithm, alg_args): alg_response = verifier.verify({'rets':alg_response}, {'rets':self.algs_reference_dict['initExp']['rets']}) log_entry = {'exp_uid':exp_uid, 'alg_label':algorithm['alg_label'], 'task':'initExp', 'duration':-1, 'timestamp':utils.datetimeNow()} self.butler.log('ALG-DURATION', log_entry) - + def init_app(self, exp_uid, alg_list, args): utils.debug_print(str(args)) def init_algs_wrapper(alg_args={}): @@ -90,161 +90,126 @@ def init_algs_wrapper(alg_args={}): self.butler.algorithms.set(uid=algorithm['alg_label'], value=algorithm) self.init_alg(exp_uid, algorithm, alg_args) # params = algorithm.get('params',None) - + return self.myApp.initExp(self.butler, init_algs_wrapper, args) - + def initExp(self, exp_uid, args_json): - try: - self.helper.ensure_indices(self.app_id,self.butler.db, self.butler.ell) - args_dict = self.helper.convert_json(args_json) - args_dict = verifier.verify(args_dict, self.reference_dict['initExp']['args']) - args_dict['exp_uid'] = exp_uid # to get doc from db - args_dict['start_date'] = utils.datetime2str(utils.datetimeNow()) - self.butler.admin.set(uid=exp_uid,value={'exp_uid': exp_uid, 'app_id':self.app_id, 'start_date':str(utils.datetimeNow())}) - utils.debug_print("ASD "+str(args_dict)) - args_dict['args'] = self.init_app(exp_uid, args_dict['args']['alg_list'], args_dict['args']) - args_dict['git_hash'] = git_hash - self.butler.experiment.set(value=args_dict) - return '{}', True, '' - except Exception, error: - exc_type, exc_value, exc_traceback = sys.exc_info() - full_error = str(traceback.format_exc())+'\n'+str(error) - utils.debug_print("initExp Exception: " + full_error, color='red') - log_entry = { 'exp_uid':exp_uid,'task':'initExp','error':full_error,'timestamp':utils.datetimeNow(),'args_json':args_json } - self.butler.ell.log( self.app_id+':APP-EXCEPTION', log_entry ) - traceback.print_tb(exc_traceback) - return '{}', False, str(error) + self.helper.ensure_indices(self.app_id, self.butler.db, self.butler.ell) + args_dict = self.helper.convert_json(args_json) + args_dict = verifier.verify(args_dict, self.reference_dict['initExp']['args']) + args_dict['exp_uid'] = exp_uid # to get doc from db + args_dict['start_date'] = utils.datetime2str(utils.datetimeNow()) + self.butler.admin.set(uid=exp_uid, value={'exp_uid': exp_uid, 'app_id':self.app_id, 'start_date':str(utils.datetimeNow())}) + utils.debug_print("ASD "+str(args_dict)) + args_dict['args'] = self.init_app(exp_uid, args_dict['args']['alg_list'], args_dict['args']) + args_dict['git_hash'] = git_hash + self.butler.experiment.set(value=args_dict) + return '{}', True, '' def getQuery(self, exp_uid, args_json): - try: - args_dict = self.helper.convert_json(args_json) - args_dict = verifier.verify(args_dict, self.reference_dict['getQuery']['args']) - experiment_dict = self.butler.experiment.get() - alg_list = experiment_dict['args']['alg_list'] - participant_to_algorithm_management = experiment_dict['args']['participant_to_algorithm_management'] - algorithm_management_settings = experiment_dict['args']['algorithm_management_settings'] - # Create the participant dictionary in participants bucket if needed. Also pull out label and id for this algorithm - participant_uid = args_dict['args'].get('participant_uid', args_dict['exp_uid']) - # Check to see if the first participant has come by and if not, save to db - participant_doc = self.butler.participants.get(uid=participant_uid) - first_participant_query = participant_doc==None - if first_participant_query: - participant_doc = {} - self.butler.participants.set(uid=participant_uid, value={'exp_uid':exp_uid, 'participant_uid':participant_uid}) - if (participant_uid == exp_uid) or (participant_to_algorithm_management == 'one_to_many') or (first_participant_query): - - if algorithm_management_settings['mode'] == 'fixed_proportions': - labels = [alg['alg_label'] for alg in algorithm_management_settings['params']] - prop = [prop_item['proportion'] for prop_item in algorithm_management_settings['params']] - # reorder prop and alg_list to have same order - new_alg_list = [] + args_dict = self.helper.convert_json(args_json) + args_dict = verifier.verify(args_dict, self.reference_dict['getQuery']['args']) + experiment_dict = self.butler.experiment.get() + alg_list = experiment_dict['args']['alg_list'] + participant_to_algorithm_management = experiment_dict['args']['participant_to_algorithm_management'] + algorithm_management_settings = experiment_dict['args']['algorithm_management_settings'] + # Create the participant dictionary in participants bucket if needed. Also pull out label and id for this algorithm + participant_uid = args_dict['args'].get('participant_uid', args_dict['exp_uid']) + # Check to see if the first participant has come by and if not, save to db + participant_doc = self.butler.participants.get(uid=participant_uid) + first_participant_query = participant_doc==None + if first_participant_query: + participant_doc = {} + self.butler.participants.set(uid=participant_uid, value={'exp_uid':exp_uid, 'participant_uid':participant_uid}) + if (participant_uid == exp_uid) or (participant_to_algorithm_management == 'one_to_many') or (first_participant_query): + + if algorithm_management_settings['mode'] == 'fixed_proportions': + labels = [alg['alg_label'] for alg in algorithm_management_settings['params']] + prop = [prop_item['proportion'] for prop_item in algorithm_management_settings['params']] + # reorder prop and alg_list to have same order + new_alg_list = [] + broken = False + for label in labels: broken = False - for label in labels: - broken = False - for alg in alg_list: - if label == alg['alg_label']: - new_alg_list += [alg] - broken = True - break - if not broken: - raise Exception('alg_label not present for both porportions and labels') - chosen_alg = numpy.random.choice(new_alg_list, p=prop) - elif algorithm_management_settings['mode'] == 'custom' : - chosen_alg = self.myApp.chooseAlg(self.butler, alg_list, args_dict['args']) - else: - chosen_alg = numpy.random.choice(alg_list) - - alg_id = chosen_alg['alg_id'] - alg_label = chosen_alg['alg_label'] - if (first_participant_query) and (participant_to_algorithm_management=='one_to_one'): - self.butler.participants.set(uid=participant_uid, key='alg_id',value=alg_id) - self.butler.participants.set(uid=participant_uid, key='alg_label',value=alg_label) - elif (participant_to_algorithm_management=='one_to_one'): - alg_id = participant_doc['alg_id'] - alg_label = participant_doc['alg_label'] - - query_uid = utils.getNewUID() - args_dict['args'].update(query_uid=query_uid) - query_doc = self.call_app_fn(alg_label, alg_id, 'getQuery', args_dict) - - query_doc.update({'participant_uid':participant_uid, - 'alg_id':alg_id, - 'exp_uid':exp_uid, - 'alg_label':alg_label, - 'timestamp_query_generated':str(utils.datetimeNow()), - 'query_uid':query_uid}) - self.butler.queries.set(uid=query_uid, value=query_doc) - return json.dumps({'args':query_doc,'meta':{'log_entry_durations':self.log_entry_durations}}), True,'' - except Exception, error: - exc_type, exc_value, exc_traceback = sys.exc_info() - full_error = str(traceback.format_exc())+'\n'+str(error) - utils.debug_print("getQuery Exception: " + full_error, color='red') - log_entry = { 'exp_uid':exp_uid,'task':'getQuery','error':full_error,'timestamp':utils.datetimeNow(),'args_json':args_json } - self.butler.ell.log( self.app_id+':APP-EXCEPTION', log_entry ) - traceback.print_tb(exc_traceback) - return '{}', False, str(error) + for alg in alg_list: + if label == alg['alg_label']: + new_alg_list += [alg] + broken = True + break + if not broken: + raise Exception('alg_label not present for both porportions and labels') + chosen_alg = numpy.random.choice(new_alg_list, p=prop) + elif algorithm_management_settings['mode'] == 'custom' : + chosen_alg = self.myApp.chooseAlg(self.butler, alg_list, args_dict['args']) + else: + chosen_alg = numpy.random.choice(alg_list) + + alg_id = chosen_alg['alg_id'] + alg_label = chosen_alg['alg_label'] + if (first_participant_query) and (participant_to_algorithm_management=='one_to_one'): + self.butler.participants.set(uid=participant_uid, key='alg_id', value=alg_id) + self.butler.participants.set(uid=participant_uid, key='alg_label', value=alg_label) + elif (participant_to_algorithm_management=='one_to_one'): + alg_id = participant_doc['alg_id'] + alg_label = participant_doc['alg_label'] + + query_uid = utils.getNewUID() + args_dict['args'].update(query_uid=query_uid) + query_doc = self.call_app_fn(alg_label, alg_id, 'getQuery', args_dict) + + query_doc.update({'participant_uid':participant_uid, + 'alg_id':alg_id, + 'exp_uid':exp_uid, + 'alg_label':alg_label, + 'timestamp_query_generated':str(utils.datetimeNow()), + 'query_uid':query_uid}) + self.butler.queries.set(uid=query_uid, value=query_doc) + return json.dumps({'args':query_doc,'meta':{'log_entry_durations':self.log_entry_durations}}), True, '' + def processAnswer(self, exp_uid, args_json): - try: - args_dict = self.helper.convert_json(args_json) - args_dict = verifier.verify(args_dict, self.reference_dict['processAnswer']['args']) - # Update timing info in query - query = self.butler.queries.get(uid=args_dict['args']['query_uid']) - timestamp_answer_received = args_dict['args'].get('timestamp_answer_received', None) - delta_datetime = utils.str2datetime(timestamp_answer_received) - \ - utils.str2datetime(query['timestamp_query_generated']) - round_trip_time = delta_datetime.total_seconds() - response_time = float(args_dict['args'].get('response_time',0.)) - - query_update = self.call_app_fn(query['alg_label'], query['alg_id'], 'processAnswer', args_dict) - query_update.update({'response_time':response_time, - 'network_delay':round_trip_time - response_time, - 'timestamp_answer_received': timestamp_answer_received - }) - self.butler.queries.set_many(uid=args_dict['args']['query_uid'],key_value_dict=query_update) - - return json.dumps({'args': {}, 'meta': {'log_entry_durations':self.log_entry_durations}}), True, '' - - except Exception, error: - exc_type, exc_value, exc_traceback = sys.exc_info() - full_error = str(traceback.format_exc())+'\n'+str(error) - utils.debug_print("processAnswer Exception: " + full_error, color='red') - log_entry = { 'exp_uid':exp_uid,'task':'processAnswer','error':full_error,'timestamp':utils.datetimeNow(),'args_json':args_json } - self.butler.ell.log( self.app_id+':APP-EXCEPTION', log_entry ) - traceback.print_tb(exc_traceback) - raise Exception(error) + args_dict = self.helper.convert_json(args_json) + args_dict = verifier.verify(args_dict, self.reference_dict['processAnswer']['args']) + # Update timing info in query + query = self.butler.queries.get(uid=args_dict['args']['query_uid']) + timestamp_answer_received = args_dict['args'].get('timestamp_answer_received', None) + delta_datetime = utils.str2datetime(timestamp_answer_received) - \ + utils.str2datetime(query['timestamp_query_generated']) + round_trip_time = delta_datetime.total_seconds() + response_time = float(args_dict['args'].get('response_time', 0.)) + + query_update = self.call_app_fn(query['alg_label'], query['alg_id'], 'processAnswer', args_dict) + query_update.update({'response_time':response_time, + 'network_delay':round_trip_time - response_time, + 'timestamp_answer_received': timestamp_answer_received + }) + self.butler.queries.set_many(uid=args_dict['args']['query_uid'], key_value_dict=query_update) + + return json.dumps({'args': {}, 'meta': {'log_entry_durations':self.log_entry_durations}}), True, '' + def getModel(self, exp_uid, args_json): - try: - args_dict = self.helper.convert_json(args_json) - args_dict = verifier.verify(args_dict, self.reference_dict['getModel']['args']) - alg_label = args_dict['args']['alg_label'] - args = self.butler.experiment.get(key='args') - for algorithm in args['alg_list']: - if alg_label == algorithm['alg_label']: - alg_id = algorithm['alg_id'] - - myapp_response = self.call_app_fn(alg_label, alg_id, 'getModel', args_dict) - - myapp_response['exp_uid'] = exp_uid - myapp_response['alg_label'] = alg_label - # Log the response of the getModel in ALG-EVALUATION - if args_dict['args']['logging']: - alg_log_entry = {'exp_uid': exp_uid, 'alg_label':alg_label, 'task': 'getModel', 'timestamp': str(utils.datetimeNow())} - alg_log_entry.update(myapp_response) - self.butler.log('ALG-EVALUATION', alg_log_entry) - return json.dumps({'args': myapp_response, - 'meta': {'log_entry_durations':self.log_entry_durations, - 'timestamp': str(utils.datetimeNow())}}), True, '' - except Exception, error: - exc_type, exc_value, exc_traceback = sys.exc_info() - full_error = str(traceback.format_exc())+'\n'+str(error) - utils.debug_print("getModel Exception: " + full_error, color='red') - log_entry = { 'exp_uid':exp_uid,'task':'getModel','error':full_error,'timestamp':utils.datetimeNow(),'args_json':args_json } - self.butler.ell.log( self.app_id+':APP-EXCEPTION', log_entry ) - traceback.print_tb(exc_traceback) - return Exception(error) + args_dict = self.helper.convert_json(args_json) + args_dict = verifier.verify(args_dict, self.reference_dict['getModel']['args']) + alg_label = args_dict['args']['alg_label'] + args = self.butler.experiment.get(key='args') + for algorithm in args['alg_list']: + if alg_label == algorithm['alg_label']: + alg_id = algorithm['alg_id'] + + myapp_response = self.call_app_fn(alg_label, alg_id, 'getModel', args_dict) + + myapp_response['exp_uid'] = exp_uid + myapp_response['alg_label'] = alg_label + # Log the response of the getModel in ALG-EVALUATION + if args_dict['args']['logging']: + alg_log_entry = {'exp_uid': exp_uid, 'alg_label':alg_label, 'task': 'getModel', 'timestamp': str(utils.datetimeNow())} + alg_log_entry.update(myapp_response) + self.butler.log('ALG-EVALUATION', alg_log_entry) + return json.dumps({'args': myapp_response, + 'meta': {'log_entry_durations':self.log_entry_durations, + 'timestamp': str(utils.datetimeNow())}}), True, '' class Helper(object): From 2e5778131dd1042256a5644414c15cd3729b6d13 Mon Sep 17 00:00:00 2001 From: Liam Marshall Date: Thu, 15 Jun 2017 12:19:13 -0500 Subject: [PATCH 2/3] tasks.py: Handle exceptions in App method invocations --- next/broker/celery_app/tasks.py | 105 ++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 40 deletions(-) diff --git a/next/broker/celery_app/tasks.py b/next/broker/celery_app/tasks.py index b502d4d0..6703b02f 100644 --- a/next/broker/celery_app/tasks.py +++ b/next/broker/celery_app/tasks.py @@ -9,6 +9,7 @@ import numpy from next.constants import DEBUG_ON import hashlib +from functools import wraps # import next.logging_client.LoggerHTTP as ell from next.database_client.DatabaseAPI import DatabaseAPI @@ -36,41 +37,65 @@ def getModel(self, args_in_json): meta = args_out_dict.get('meta',{}) if 'log_entry_durations' in meta.keys(): self.log_entry_durations = meta['log_entry_durations'] - self.log_entry_durations['timestamp'] = next.utils.datetimeNow() + self.log_entry_durations['timestamp'] = next.utils.datetimeNow() return args_out_dict['args'] - + +def handle_exception(f, app, task_name): + @wraps(f) + def wrapper(exp_uid, args_json): + try: + return f(exp_uid, args_json) + except Exception as e: + exc_type, exc_value, exc_traceback = sys.exc_info() + full_error = str(traceback.format_exc())+'\n'+str(e) + next.utils.debug_print("{} Exception: {}".format(task_name, e), color='red') + log_entry = {'exp_uid': exp_uid, + 'task': task_name, + 'error': full_error, + 'timestamp': next.utils.datetimeNow(), + 'args_json': args_json} + app.butler.ell.log(app.app_id+':APP-EXCEPTION', log_entry) + traceback.print_tb(exc_traceback) + return '{}', False, str(e) + return wrapper + # Main application task def apply(app_id, exp_uid, task_name, args_in_json, enqueue_timestamp): - enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) - dequeue_datetime = next.utils.datetimeNow() - delta_datetime = dequeue_datetime - enqueue_datetime - time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. - - # modify args_in - if task_name == 'processAnswer': - args_in_dict = json.loads(args_in_json) - args_in_dict['args']['timestamp_answer_received'] = enqueue_timestamp - args_in_json = json.dumps(args_in_dict) - # get stateless app - next_app = next.utils.get_app(app_id, exp_uid, db, ell) - # pass it to a method - method = getattr(next_app, task_name) - response, dt = next.utils.timeit(method)(exp_uid, args_in_json) - args_out_json,didSucceed,message = response - args_out_dict = json.loads(args_out_json) - if 'args' in args_out_dict: - return_value = (json.dumps(args_out_dict['args']),didSucceed,message) - meta = args_out_dict.get('meta',{}) - if 'log_entry_durations' in meta: - log_entry_durations = meta['log_entry_durations'] - log_entry_durations['app_duration'] = dt - log_entry_durations['duration_enqueued'] = time_enqueued - log_entry_durations['timestamp'] = next.utils.datetimeNow() - ell.log( app_id+':ALG-DURATION', log_entry_durations ) - else: - return_value = (args_out_json,didSucceed,message) - print '#### Finished %s, time_enqueued=%s, execution_time=%s ####' % (task_name,time_enqueued,dt) - return return_value + enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) + dequeue_datetime = next.utils.datetimeNow() + delta_datetime = dequeue_datetime - enqueue_datetime + time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. + + # modify args_in + if task_name == 'processAnswer': + args_in_dict = json.loads(args_in_json) + args_in_dict['args']['timestamp_answer_received'] = enqueue_timestamp + args_in_json = json.dumps(args_in_dict) + + # get stateless app + next_app = next.utils.get_app(app_id, exp_uid, db, ell) + # pass it to a method + method = getattr(next_app, task_name) + + response, dt = next.utils.timeit(handle_exception(method, next_app, task_name))(exp_uid, args_in_json) + args_out_json, didSucceed, message = response + args_out_dict = json.loads(args_out_json) + + if 'args' in args_out_dict: + return_value = (json.dumps(args_out_dict['args']), didSucceed, message) + meta = args_out_dict.get('meta', {}) + if 'log_entry_durations' in meta: + log_entry_durations = meta['log_entry_durations'] + log_entry_durations['app_duration'] = dt + log_entry_durations['duration_enqueued'] = time_enqueued + log_entry_durations['timestamp'] = next.utils.datetimeNow() + ell.log( app_id+':ALG-DURATION', log_entry_durations ) + else: + return_value = (args_out_json, didSucceed, message) + + print('#### Finished %s, time_enqueued=%s, execution_time=%s ####' % (task_name, time_enqueued, dt)) + + return return_value def apply_dashboard(app_id, exp_uid, args_in_json, enqueue_timestamp): enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) @@ -93,7 +118,7 @@ def apply_dashboard(app_id, exp_uid, args_in_json, enqueue_timestamp): app = App_Wrapper(app_id, exp_uid, db, ell) cached_doc = app.butler.dashboard.get(uid=stat_uid) cached_response = None - if (int(stat_args.get('force_recompute',0))==0) and (cached_doc is not None): + if (int(stat_args.get('force_recompute',0))==0) and (cached_doc is not None): delta_datetime = (next.utils.datetimeNow() - next.utils.str2datetime(cached_doc['timestamp'])) if delta_datetime.seconds < next.constants.DASHBOARD_STALENESS_IN_SECONDS: cached_response = json.loads(cached_doc['data_dict']) @@ -112,7 +137,7 @@ def apply_dashboard(app_id, exp_uid, args_in_json, enqueue_timestamp): dashboard = dashboard(db, ell) stats_method = getattr(dashboard, stat_id) response,dt = next.utils.timeit(stats_method)(app,app.butler,**args_dict['args']['params']) - + save_dict = {'exp_uid':app.exp_uid, 'stat_uid':stat_uid, 'timestamp':next.utils.datetime2str(next.utils.datetimeNow()), @@ -132,7 +157,7 @@ def apply_dashboard(app_id, exp_uid, args_in_json, enqueue_timestamp): return json.dumps(response), True, '' -def apply_sync_by_namespace(app_id, exp_uid, alg_id, alg_label, task_name, args, namespace, job_uid, enqueue_timestamp, time_limit): +def apply_sync_by_namespace(app_id, exp_uid, alg_id, alg_label, task_name, args, namespace, job_uid, enqueue_timestamp, time_limit): enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) dequeue_datetime = next.utils.datetimeNow() delta_datetime = dequeue_datetime - enqueue_datetime @@ -153,18 +178,18 @@ def apply_sync_by_namespace(app_id, exp_uid, alg_id, alg_label, task_name, args, log_entry_durations['timestamp'] = next.utils.datetimeNow() ell.log( app_id+':ALG-DURATION', log_entry_durations) print '########## Finished namespace:%s, job_uid=%s, time_enqueued=%s, execution_time=%s ##########' % (namespace,job_uid,time_enqueued,dt) - return + return except Exception, error: exc_type, exc_value, exc_traceback = sys.exc_info() print "tasks Exception: {} {}".format(error, traceback.format_exc()) - traceback.print_tb(exc_traceback) - + traceback.print_tb(exc_traceback) + # error = traceback.format_exc() - # log_entry = { 'exp_uid':exp_uid,'task':'daemonProcess','error':error,'timestamp':next.utils.datetimeNow() } + # log_entry = { 'exp_uid':exp_uid,'task':'daemonProcess','error':error,'timestamp':next.utils.datetimeNow() } # ell.log( app_id+':APP-EXCEPTION', log_entry ) return None -# forces each worker to get its own random seed. +# forces each worker to get its own random seed. @celery.signals.worker_process_init.connect() def seed_rng(**_): """ From eb7bef4dfac40b6c7bc34d6a40129b1c8522a089 Mon Sep 17 00:00:00 2001 From: Liam Marshall Date: Fri, 16 Jun 2017 11:50:44 -0500 Subject: [PATCH 3/3] Clean up tasks.py; fix indentation --- next/broker/celery_app/tasks.py | 193 ++++++++++++++++---------------- 1 file changed, 99 insertions(+), 94 deletions(-) diff --git a/next/broker/celery_app/tasks.py b/next/broker/celery_app/tasks.py index 6703b02f..6edcc6bd 100644 --- a/next/broker/celery_app/tasks.py +++ b/next/broker/celery_app/tasks.py @@ -1,27 +1,26 @@ from __future__ import absolute_import -from .celery_broker import app -import celery.signals import os import sys import time import json -import traceback -import numpy -from next.constants import DEBUG_ON import hashlib +import traceback from functools import wraps +import numpy +import celery.signals +from .celery_broker import app -# import next.logging_client.LoggerHTTP as ell -from next.database_client.DatabaseAPI import DatabaseAPI -db = DatabaseAPI() -from next.logging_client.LoggerAPI import LoggerAPI -ell = LoggerAPI() import next.utils import next.constants -import next.apps.Butler as Butler import next.lib.pijemont.verifier as verifier +from next.constants import DEBUG_ON +from next.apps.Butler import Butler +from next.database_client.DatabaseAPI import DatabaseAPI +from next.logging_client.LoggerAPI import LoggerAPI + +db = DatabaseAPI() +ell = LoggerAPI() -Butler = Butler.Butler class App_Wrapper: def __init__(self, app_id, exp_uid, db, ell): @@ -98,96 +97,102 @@ def apply(app_id, exp_uid, task_name, args_in_json, enqueue_timestamp): return return_value def apply_dashboard(app_id, exp_uid, args_in_json, enqueue_timestamp): - enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) - dequeue_datetime = next.utils.datetimeNow() - delta_datetime = dequeue_datetime - enqueue_datetime - time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. - dir, _ = os.path.split(__file__) - reference_dict,errs = verifier.load_doc('{}/myApp.yaml'.format(app_id, app_id),"apps/") - if len(errs) > 0: - raise Exception("App YAML format errors: \n{}".format(str(errs))) - args_dict = verifier.verify(args_in_json, reference_dict['getStats']['args']) - stat_id = args_dict['args'].get('stat_id','none') - - stat_args = args_dict['args'] - - hash_object = hashlib.md5(stat_id+'_'+json.dumps(stat_args['params'])) - stat_uid = hash_object.hexdigest() - stat_uid += '_' + exp_uid - - app = App_Wrapper(app_id, exp_uid, db, ell) - cached_doc = app.butler.dashboard.get(uid=stat_uid) - cached_response = None - if (int(stat_args.get('force_recompute',0))==0) and (cached_doc is not None): - delta_datetime = (next.utils.datetimeNow() - next.utils.str2datetime(cached_doc['timestamp'])) - if delta_datetime.seconds < next.constants.DASHBOARD_STALENESS_IN_SECONDS: + enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) + dequeue_datetime = next.utils.datetimeNow() + + delta_datetime = dequeue_datetime - enqueue_datetime + time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. + dir, _ = os.path.split(__file__) + reference_dict,errs = verifier.load_doc('{}/myApp.yaml'.format(app_id, app_id),"apps/") + if len(errs) > 0: + raise Exception("App YAML format errors: \n{}".format(str(errs))) + args_dict = verifier.verify(args_in_json, reference_dict['getStats']['args']) + stat_id = args_dict['args'].get('stat_id','none') + + stat_args = args_dict['args'] + + hash_object = hashlib.md5(stat_id+'_'+json.dumps(stat_args['params'])) + stat_uid = hash_object.hexdigest() + stat_uid += '_' + exp_uid + + app = App_Wrapper(app_id, exp_uid, db, ell) + cached_doc = app.butler.dashboard.get(uid=stat_uid) + cached_response = None + if (int(stat_args.get('force_recompute',0))==0) and (cached_doc is not None): + delta_datetime = (next.utils.datetimeNow() - next.utils.str2datetime(cached_doc['timestamp'])) + if delta_datetime.seconds < next.constants.DASHBOARD_STALENESS_IN_SECONDS: cached_response = json.loads(cached_doc['data_dict']) if 'meta' not in cached_response: - cached_response['meta']={} + cached_response['meta']={} cached_response['meta']['cached'] = 1 if delta_datetime.seconds/60<1: cached_response['meta']['last_dashboard_update'] = '<1 minute ago' else: cached_response['meta']['last_dashboard_update'] = str(delta_datetime.seconds/60)+' minutes ago' - if cached_response==None: - dashboard_string = 'apps.' + app_id + '.dashboard.Dashboard' - dashboard_module = __import__(dashboard_string, fromlist=['']) - dashboard = getattr(dashboard_module, 'MyAppDashboard') - dashboard = dashboard(db, ell) - stats_method = getattr(dashboard, stat_id) - response,dt = next.utils.timeit(stats_method)(app,app.butler,**args_dict['args']['params']) - - save_dict = {'exp_uid':app.exp_uid, - 'stat_uid':stat_uid, - 'timestamp':next.utils.datetime2str(next.utils.datetimeNow()), - 'data_dict':json.dumps(response)} - app.butler.dashboard.set_many(uid=stat_uid,key_value_dict=save_dict) - - # update the admin timing with the timing of a getModel - if hasattr(app, 'log_entry_durations'): - app.log_entry_durations['app_duration'] = dt - app.log_entry_durations['duration_enqueued'] = time_enqueued - app.butler.ell.log(app.app_id+':ALG-DURATION', app.log_entry_durations) - else: - response = cached_response - - if DEBUG_ON: - next.utils.debug_print('#### Finished Dashboard %s, time_enqueued=%s, execution_time=%s ####' % (stat_id, time_enqueued, dt), color='white') - return json.dumps(response), True, '' + if cached_response==None: + dashboard_string = 'apps.' + app_id + '.dashboard.Dashboard' + dashboard_module = __import__(dashboard_string, fromlist=['']) + dashboard = getattr(dashboard_module, 'MyAppDashboard') + dashboard = dashboard(db, ell) + stats_method = getattr(dashboard, stat_id) + response,dt = next.utils.timeit(stats_method)(app,app.butler,**args_dict['args']['params']) + + save_dict = {'exp_uid':app.exp_uid, + 'stat_uid':stat_uid, + 'timestamp':next.utils.datetime2str(next.utils.datetimeNow()), + 'data_dict':json.dumps(response)} + app.butler.dashboard.set_many(uid=stat_uid,key_value_dict=save_dict) + + # update the admin timing with the timing of a getModel + if hasattr(app, 'log_entry_durations'): + app.log_entry_durations['app_duration'] = dt + app.log_entry_durations['duration_enqueued'] = time_enqueued + app.butler.ell.log(app.app_id+':ALG-DURATION', app.log_entry_durations) + else: + response = cached_response + + if DEBUG_ON: + next.utils.debug_print('#### Finished Dashboard %s, time_enqueued=%s, execution_time=%s ####' % (stat_id, time_enqueued, dt), color='white') + return json.dumps(response), True, '' def apply_sync_by_namespace(app_id, exp_uid, alg_id, alg_label, task_name, args, namespace, job_uid, enqueue_timestamp, time_limit): - enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) - dequeue_datetime = next.utils.datetimeNow() - delta_datetime = dequeue_datetime - enqueue_datetime - time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. - - try: - print '>>>>>>>> Starting namespace:%s, job_uid=%s, time_enqueued=%s <<<<<<<<<' % (namespace,job_uid,time_enqueued) - # get stateless app - next_app = next.utils.get_app(app_id, exp_uid, db, ell) - target_manager = next_app.myApp.TargetManager - next_alg = next.utils.get_app_alg(app_id, alg_id) - butler = Butler(app_id, exp_uid, target_manager, db, ell, alg_label, alg_id) - response,dt = next.utils.timeit(getattr(next_alg, task_name))(butler, args) - log_entry_durations = { 'exp_uid':exp_uid,'alg_label':alg_label,'task':'daemonProcess','duration':dt } - log_entry_durations.update(butler.algorithms.getDurations()) - log_entry_durations['app_duration'] = dt - log_entry_durations['duration_enqueued'] = time_enqueued - log_entry_durations['timestamp'] = next.utils.datetimeNow() - ell.log( app_id+':ALG-DURATION', log_entry_durations) - print '########## Finished namespace:%s, job_uid=%s, time_enqueued=%s, execution_time=%s ##########' % (namespace,job_uid,time_enqueued,dt) - return - except Exception, error: - exc_type, exc_value, exc_traceback = sys.exc_info() - print "tasks Exception: {} {}".format(error, traceback.format_exc()) - traceback.print_tb(exc_traceback) - - # error = traceback.format_exc() - # log_entry = { 'exp_uid':exp_uid,'task':'daemonProcess','error':error,'timestamp':next.utils.datetimeNow() } - # ell.log( app_id+':APP-EXCEPTION', log_entry ) - return None + enqueue_datetime = next.utils.str2datetime(enqueue_timestamp) + dequeue_datetime = next.utils.datetimeNow() + delta_datetime = dequeue_datetime - enqueue_datetime + time_enqueued = delta_datetime.seconds + delta_datetime.microseconds/1000000. + + try: + print '>>>>>>>> Starting namespace:%s, job_uid=%s, time_enqueued=%s <<<<<<<<<' % (namespace,job_uid,time_enqueued) + + # get stateless app + next_app = next.utils.get_app(app_id, exp_uid, db, ell) + target_manager = next_app.myApp.TargetManager + next_alg = next.utils.get_app_alg(app_id, alg_id) + butler = Butler(app_id, exp_uid, target_manager, db, ell, alg_label, alg_id) + + response,dt = next.utils.timeit(getattr(next_alg, task_name))(butler, args) + + log_entry_durations = { 'exp_uid':exp_uid,'alg_label':alg_label,'task':'daemonProcess','duration':dt } + log_entry_durations.update(butler.algorithms.getDurations()) + log_entry_durations['app_duration'] = dt + log_entry_durations['duration_enqueued'] = time_enqueued + log_entry_durations['timestamp'] = next.utils.datetimeNow() + ell.log( app_id+':ALG-DURATION', log_entry_durations) + + print '########## Finished namespace:%s, job_uid=%s, time_enqueued=%s, execution_time=%s ##########' % (namespace,job_uid,time_enqueued,dt) + return + except Exception, error: + exc_type, exc_value, exc_traceback = sys.exc_info() + print "tasks Exception: {} {}".format(error, traceback.format_exc()) + traceback.print_tb(exc_traceback) + + error = traceback.format_exc() + log_entry = {'exp_uid': exp_uid, 'task': task_name, 'error': error, 'timestamp': next.utils.datetimeNow()} + ell.log(app_id+':APP-EXCEPTION', log_entry) + + return None # forces each worker to get its own random seed. @celery.signals.worker_process_init.connect() @@ -199,7 +204,7 @@ def seed_rng(**_): # If celery isn't off, celery-wrap the functions so they can be called with apply_async if next.constants.CELERY_ON: - apply = app.task(apply) - apply_dashboard = app.task(apply_dashboard) - apply_sync_by_namespace = app.task(apply_sync_by_namespace) + apply = app.task(apply) + apply_dashboard = app.task(apply_dashboard) + apply_sync_by_namespace = app.task(apply_sync_by_namespace)