diff --git a/api/base.py b/api/base.py index 236d20fe7..a9f817817 100644 --- a/api/base.py +++ b/api/base.py @@ -2,6 +2,7 @@ import datetime import json import jsonschema +import pymongo import requests import traceback import urllib @@ -10,10 +11,13 @@ from . import util from . import config +from .types import Origin from . import validators log = config.log +# When authenticating as a drone, the user agent must start with this prefix. +DRONE_PREFIX = 'SciTran Drone ' class RequestHandler(webapp2.RequestHandler): @@ -29,6 +33,7 @@ def __init__(self, request=None, response=None): self.uid = None self.source_site = None drone_request = False + drone_name = '' user_agent = self.request.headers.get('User-Agent', '') access_token = self.request.headers.get('Authorization', None) @@ -75,13 +80,14 @@ def __init__(self, request=None, response=None): self.uid = self.get_param('user') # Drone shared secret authentication - elif drone_secret is not None and user_agent.startswith('SciTran Drone '): + elif drone_secret is not None and user_agent.startswith(DRONE_PREFIX): if config.get_item('core', 'drone_secret') is None: self.abort(401, 'drone secret not configured') if drone_secret != config.get_item('core', 'drone_secret'): self.abort(401, 'invalid drone secret') - log.info('drone "' + user_agent.replace('SciTran Drone ', '') + '" request accepted') drone_request = True + drone_name = user_agent.replace(DRONE_PREFIX, '') + log.info('drone "' + drone_name + '" request accepted') # Cross-site authentication elif user_agent.startswith('SciTran Instance '): @@ -116,6 +122,61 @@ def __init__(self, request=None, response=None): else: self.superuser_request = False + self.set_origin(drone_request, drone_name) + + def set_origin(self, drone_request, drone_name): + """ + Add an origin to the request object. Used later in request handler logic. + + Pretty clear duplication of logic with superuser_request / drone_request; + this map serves a different purpose, and specifically matches the desired file-origin map. + Might be a good future project to remove one or the other. + """ + + if self.uid is not None: + self.origin = { + 'type': str(Origin.user), + 'id': self.uid + } + elif drone_request: + self.origin = { + 'type': str(Origin.device), + 'id': drone_name + } + + # Upsert device record, with last-contacted time. + # In the future, consider merging any keys into self.origin? + device_record = config.db['devices'].find_one_and_update({ + '_id': self.origin['id'] + }, { + '$set': { + '_id': self.origin['id'], + 'last-seen': datetime.datetime.utcnow() + } + }, + upsert=True, + return_document=pymongo.collection.ReturnDocument.AFTER + ) + + # Bit hackish - detect from route if a job is the origin, and if so what job ID. + # Could be removed if routes get reorganized. POST /api/jobs/id/result, maybe? + is_job_upload = self.request.path.startswith('/api/engine') + job_id = self.request.GET.get('job') + + # This runs after the standard drone-request upsert above so that we can still update the last-seen timestamp. + if is_job_upload and job_id is not None: + self.origin = { + 'type': str(Origin.job), + 'id': job_id + } + else: + self.origin = { + 'type': str(Origin.unknown), + 'id': None + } + + # print json.dumps(self.origin) + def is_true(self, param): return self.request.GET.get(param, '').lower() in ('1', 'true') diff --git a/api/handlers/containerhandler.py b/api/handlers/containerhandler.py index 5112daeea..a47152758 100644 --- a/api/handlers/containerhandler.py +++ b/api/handlers/containerhandler.py @@ -1,4 +1,5 @@ import bson +import json import datetime from .. import base @@ -8,6 +9,7 @@ from .. import validators from ..auth import containerauth, always_ok from ..dao import APIStorageException, containerstorage +from ..types import Origin log = config.log @@ -97,6 +99,48 @@ def get(self, cont_name, **kwargs): fileinfo['path'] = util.path_from_hash(fileinfo['hash']) if self.debug: debuginfo.add_debuginfo(self, cont_name, result) + + return self.handle_origin(result) + + def handle_origin(self, result): + """ + Given an object with a `files` array key, coalesce and merge file origins if requested. + """ + + # If `join=origin` passed as a request param, join out that key + join_origin = 'origin' in self.request.params.getall('join') + + # If it was requested, create a map of each type of origin to hold the join + if join_origin: + result['join-origin'] = { + Origin.user.name: {}, + Origin.device.name: {}, + Origin.job.name: {} + } + + for f in result.get('files', []): + origin = f.get('origin', None) + + if origin is None: + # Backfill origin maps if none provided from DB + f['origin'] = { + 'type': str(Origin.unknown), + 'id': None + } + + elif join_origin: + j_type = f['origin']['type'] + j_id = f['origin']['id'] + j_id_b = j_id + + # Some tables don't use BSON for their primary keys. + if j_type not in (Origin.user, Origin.device): + j_id_b = bson.ObjectId(j_id) + + # Join from database if we haven't for this origin before + if result['join-origin'][j_type].get(j_id, None) is None: + result['join-origin'][j_type][j_id] = config.db[j_type + 's'].find_one({'_id': j_id_b}) + return result def _filter_permissions(self, result, uid, site): @@ -147,6 +191,10 @@ def get_all(self, cont_name, par_cont_name=None, par_id=None): self._add_session_measurements(results) if self.debug: debuginfo.add_debuginfo(self, cont_name, results) + + for result in results: + result = self.handle_origin(result) + return results def _filter_all_permissions(self, results, uid, site): diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py index 9ac1181c2..f9bd4adda 100644 --- a/api/handlers/listhandler.py +++ b/api/handlers/listhandler.py @@ -368,7 +368,7 @@ def post(self, cont_name, list_name, **kwargs): container, permchecker, storage, mongo_validator, payload_validator, keycheck = self._initialize_request(cont_name_plural, list_name, _id) permchecker(noop)('POST', _id=_id) - return upload.process_upload(self.request, upload.Strategy.targeted, cont_name, _id) + return upload.process_upload(self.request, upload.Strategy.targeted, cont_name, _id, self.origin) def packfile(self, cont_name, **kwargs): _id = kwargs.pop('cid') @@ -394,4 +394,4 @@ def packfile(self, cont_name, **kwargs): else: raise Exception('Not authorized') - return upload.process_upload(self.request, upload.Strategy.packfile) + return upload.process_upload(self.request, upload.Strategy.packfile, None, None, self.origin) diff --git a/api/jobs.py b/api/jobs.py index 117e03306..447f821bf 100644 --- a/api/jobs.py +++ b/api/jobs.py @@ -172,7 +172,7 @@ def retry_job(db, j, force=False): log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt'])) -def generate_formula(algorithm_id, i): +def generate_formula(algorithm_id, i, job_id=None): """ Given an intent, generates a formula to execute a job. @@ -182,6 +182,8 @@ def generate_formula(algorithm_id, i): Human-friendly unique name of the algorithm i: FileInput The input to be used by this job + job_id: string + The job ID this will be placed on. Enhances the file origin by adding the job ID to the upload URL. """ gear = get_gear_by_name(algorithm_id) @@ -211,6 +213,9 @@ def generate_formula(algorithm_id, i): ], } + if job_id: + f['outputs'][0]['uri'] += '&job=' + job_id + return f @@ -286,13 +291,15 @@ def next(self): if result is None: self.abort(400, 'No jobs to process') + str_id = str(result['_id']) + # Second, update document to store formula request. result = config.db.jobs.find_one_and_update( { '_id': result['_id'] }, { '$set': { - 'request': generate_formula(result['algorithm_id'], result['input'])} + 'request': generate_formula(result['algorithm_id'], result['input'], str_id)} }, return_document=pymongo.collection.ReturnDocument.AFTER ) diff --git a/api/placer.py b/api/placer.py index d4806254a..95b13a045 100644 --- a/api/placer.py +++ b/api/placer.py @@ -21,12 +21,13 @@ class Placer(object): Interface for a placer, which knows how to process files and place them where they belong - on disk and database. """ - def __init__(self, container_type, container, id, metadata, timestamp): + def __init__(self, container_type, container, id, metadata, timestamp, origin): self.container_type = container_type - self.container = container - self.id = id - self.metadata = metadata - self.timestamp = timestamp + self.container = container + self.id = id + self.metadata = metadata + self.timestamp = timestamp + self.origin = origin def check(self): """ @@ -234,7 +235,11 @@ def finalize(self): 'instrument': None, 'measurements': [], 'tags': [], - 'metadata': {} + 'metadata': {}, + + # Manually add the file orign to the packfile metadata. + # This is set by upload.process_upload on each file, but we're not storing those. + 'origin': self.origin } # Get or create a session based on the hierarchy and provided labels. diff --git a/api/types.py b/api/types.py new file mode 100644 index 000000000..a167c8392 --- /dev/null +++ b/api/types.py @@ -0,0 +1,9 @@ +from . import util + +# Origin represents the different methods a request can be authenticated. +Origin = util.Enum('Origin', { + 'user': 'user', # An authenticated user + 'device': 'device', # A connected device (reaper, script, etc) + 'job': 'job', # Made on behalf of a job (downloading data, uploading results, etc) + 'unknown': 'unknown', # Other or public +}) diff --git a/api/upload.py b/api/upload.py index f52cfc04a..27553d44d 100644 --- a/api/upload.py +++ b/api/upload.py @@ -22,7 +22,7 @@ 'packfile' : pl.PackfilePlacer # Upload N files as a new packfile to a container. }) -def process_upload(request, strategy, container_type=None, id=None): +def process_upload(request, strategy, container_type=None, id=None, origin=None): """ Universal file upload entrypoint. @@ -76,7 +76,7 @@ def process_upload(request, strategy, container_type=None, id=None): metadata = json.loads(form['metadata'].file.getvalue()) placer_class = strategy.value - placer = placer_class(container_type, container, id, metadata, timestamp) + placer = placer_class(container_type, container, id, metadata, timestamp, origin) placer.check() # Browsers, when sending a multipart upload, will send files with field name "file" (if sinuglar) @@ -108,6 +108,7 @@ def process_upload(request, strategy, container_type=None, id=None): 'modified': field.modified, # 'size': field.size, 'hash': field.hash, + 'origin': origin, 'type': None, 'instrument': None, @@ -141,7 +142,8 @@ def reaper(self): hash=file_store.hash, mimetype=util.guess_mimetype(file_store.filename), tags=file_store.tags, - metadata=file_store.metadata.get('file', {}).get('metadata', {}) + metadata=file_store.metadata.get('file', {}).get('metadata', {}), + origin=self.origin ) target, file_metadata = reaperutil.create_container_hierarchy(file_store.metadata) @@ -179,6 +181,7 @@ def uploader(self): for target, file_dict in target_containers: for filename, parsed_file in file_dict.items(): fileinfo = parsed_file.info + fileinfo['origin'] = self.origin f = target.find(filename) target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) if not f: @@ -248,6 +251,7 @@ def engine(self): fileinfo['mimetype'] = fileinfo.get('mimetype') or util.guess_mimetype(name) fileinfo['created'] = now fileinfo['modified'] = now + fileinfo['origin'] = self.origin acquisition_obj = reaperutil.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo) for f in acquisition_obj['files']: diff --git a/api/util.py b/api/util.py index f553ba1f9..e29d55b67 100644 --- a/api/util.py +++ b/api/util.py @@ -141,3 +141,17 @@ class Enum(baseEnum.Enum): # This overrides that behaviour and removes the prefix. def __str__(self): return str(self.name) + + # Allow equality comparison with strings against the enum's name. + + def __ne__(self, other): + if isinstance(other, basestring): + return self.name != other + else: + return super.__ne__(other) + + def __eq__(self, other): + if isinstance(other, basestring): + return self.name == other + else: + return super.__eq__(other)