Skip to content

Commit

Permalink
Add origin tracking to files
Browse files Browse the repository at this point in the history
  • Loading branch information
kofalt committed Mar 14, 2016
1 parent 58e4cd2 commit d5a502b
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 15 deletions.
65 changes: 63 additions & 2 deletions api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import json
import jsonschema
import pymongo
import requests
import traceback
import urllib
Expand All @@ -10,10 +11,13 @@

from . import util
from . import config
from .types import Origin
from . import validators

log = config.log

# When authenticating as a drone, the user agent must start with this prefix.
DRONE_PREFIX = 'SciTran Drone '

class RequestHandler(webapp2.RequestHandler):

Expand All @@ -29,6 +33,7 @@ def __init__(self, request=None, response=None):
self.uid = None
self.source_site = None
drone_request = False
drone_name = ''

user_agent = self.request.headers.get('User-Agent', '')
access_token = self.request.headers.get('Authorization', None)
Expand Down Expand Up @@ -75,13 +80,14 @@ def __init__(self, request=None, response=None):
self.uid = self.get_param('user')

# Drone shared secret authentication
elif drone_secret is not None and user_agent.startswith('SciTran Drone '):
elif drone_secret is not None and user_agent.startswith(DRONE_PREFIX):
if config.get_item('core', 'drone_secret') is None:
self.abort(401, 'drone secret not configured')
if drone_secret != config.get_item('core', 'drone_secret'):
self.abort(401, 'invalid drone secret')
log.info('drone "' + user_agent.replace('SciTran Drone ', '') + '" request accepted')
drone_request = True
drone_name = user_agent.replace(DRONE_PREFIX, '')
log.info('drone "' + drone_name + '" request accepted')

# Cross-site authentication
elif user_agent.startswith('SciTran Instance '):
Expand Down Expand Up @@ -116,6 +122,61 @@ def __init__(self, request=None, response=None):
else:
self.superuser_request = False

self.set_origin(drone_request, drone_name)

def set_origin(self, drone_request, drone_name):
"""
Add an origin to the request object. Used later in request handler logic.
Pretty clear duplication of logic with superuser_request / drone_request;
this map serves a different purpose, and specifically matches the desired file-origin map.
Might be a good future project to remove one or the other.
"""

if self.uid is not None:
self.origin = {
'type': str(Origin.user),
'id': self.uid
}
elif drone_request:
self.origin = {
'type': str(Origin.device),
'id': drone_name
}

# Upsert device record, with last-contacted time.
# In the future, consider merging any keys into self.origin?
device_record = config.db['devices'].find_one_and_update({
'_id': self.origin['id']
}, {
'$set': {
'_id': self.origin['id'],
'last-seen': datetime.datetime.utcnow()
}
},
upsert=True,
return_document=pymongo.collection.ReturnDocument.AFTER
)

# Bit hackish - detect from route if a job is the origin, and if so what job ID.
# Could be removed if routes get reorganized. POST /api/jobs/id/result, maybe?
is_job_upload = self.request.path.startswith('/api/engine')
job_id = self.request.GET.get('job')

# This runs after the standard drone-request upsert above so that we can still update the last-seen timestamp.
if is_job_upload and job_id is not None:
self.origin = {
'type': str(Origin.job),
'id': job_id
}
else:
self.origin = {
'type': str(Origin.unknown),
'id': None
}

# print json.dumps(self.origin)

def is_true(self, param):
return self.request.GET.get(param, '').lower() in ('1', 'true')

Expand Down
48 changes: 48 additions & 0 deletions api/handlers/containerhandler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import bson
import json
import datetime

from .. import base
Expand All @@ -8,6 +9,7 @@
from .. import validators
from ..auth import containerauth, always_ok
from ..dao import APIStorageException, containerstorage
from ..types import Origin

log = config.log

Expand Down Expand Up @@ -97,6 +99,48 @@ def get(self, cont_name, **kwargs):
fileinfo['path'] = util.path_from_hash(fileinfo['hash'])
if self.debug:
debuginfo.add_debuginfo(self, cont_name, result)

return self.handle_origin(result)

def handle_origin(self, result):
"""
Given an object with a `files` array key, coalesce and merge file origins if requested.
"""

# If `join=origin` passed as a request param, join out that key
join_origin = 'origin' in self.request.params.getall('join')

# If it was requested, create a map of each type of origin to hold the join
if join_origin:
result['join-origin'] = {
Origin.user.name: {},
Origin.device.name: {},
Origin.job.name: {}
}

for f in result.get('files', []):
origin = f.get('origin', None)

if origin is None:
# Backfill origin maps if none provided from DB
f['origin'] = {
'type': str(Origin.unknown),
'id': None
}

elif join_origin:
j_type = f['origin']['type']
j_id = f['origin']['id']
j_id_b = j_id

# Some tables don't use BSON for their primary keys.
if j_type not in (Origin.user, Origin.device):
j_id_b = bson.ObjectId(j_id)

# Join from database if we haven't for this origin before
if result['join-origin'][j_type].get(j_id, None) is None:
result['join-origin'][j_type][j_id] = config.db[j_type + 's'].find_one({'_id': j_id_b})

return result

def _filter_permissions(self, result, uid, site):
Expand Down Expand Up @@ -147,6 +191,10 @@ def get_all(self, cont_name, par_cont_name=None, par_id=None):
self._add_session_measurements(results)
if self.debug:
debuginfo.add_debuginfo(self, cont_name, results)

for result in results:
result = self.handle_origin(result)

return results

def _filter_all_permissions(self, results, uid, site):
Expand Down
4 changes: 2 additions & 2 deletions api/handlers/listhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def post(self, cont_name, list_name, **kwargs):
container, permchecker, storage, mongo_validator, payload_validator, keycheck = self._initialize_request(cont_name_plural, list_name, _id)
permchecker(noop)('POST', _id=_id)

return upload.process_upload(self.request, upload.Strategy.targeted, cont_name, _id)
return upload.process_upload(self.request, upload.Strategy.targeted, cont_name, _id, self.origin)

def packfile(self, cont_name, **kwargs):
_id = kwargs.pop('cid')
Expand All @@ -394,4 +394,4 @@ def packfile(self, cont_name, **kwargs):
else:
raise Exception('Not authorized')

return upload.process_upload(self.request, upload.Strategy.packfile)
return upload.process_upload(self.request, upload.Strategy.packfile, None, None, self.origin)
11 changes: 9 additions & 2 deletions api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def retry_job(db, j, force=False):
log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))


def generate_formula(algorithm_id, i):
def generate_formula(algorithm_id, i, job_id=None):
"""
Given an intent, generates a formula to execute a job.
Expand All @@ -182,6 +182,8 @@ def generate_formula(algorithm_id, i):
Human-friendly unique name of the algorithm
i: FileInput
The input to be used by this job
job_id: string
The job ID this will be placed on. Enhances the file origin by adding the job ID to the upload URL.
"""

gear = get_gear_by_name(algorithm_id)
Expand Down Expand Up @@ -211,6 +213,9 @@ def generate_formula(algorithm_id, i):
],
}

if job_id:
f['outputs'][0]['uri'] += '&job=' + job_id

return f


Expand Down Expand Up @@ -286,13 +291,15 @@ def next(self):
if result is None:
self.abort(400, 'No jobs to process')

str_id = str(result['_id'])

# Second, update document to store formula request.
result = config.db.jobs.find_one_and_update(
{
'_id': result['_id']
},
{ '$set': {
'request': generate_formula(result['algorithm_id'], result['input'])}
'request': generate_formula(result['algorithm_id'], result['input'], str_id)}
},
return_document=pymongo.collection.ReturnDocument.AFTER
)
Expand Down
17 changes: 11 additions & 6 deletions api/placer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class Placer(object):
Interface for a placer, which knows how to process files and place them where they belong - on disk and database.
"""

def __init__(self, container_type, container, id, metadata, timestamp):
def __init__(self, container_type, container, id, metadata, timestamp, origin):
self.container_type = container_type
self.container = container
self.id = id
self.metadata = metadata
self.timestamp = timestamp
self.container = container
self.id = id
self.metadata = metadata
self.timestamp = timestamp
self.origin = origin

def check(self):
"""
Expand Down Expand Up @@ -234,7 +235,11 @@ def finalize(self):
'instrument': None,
'measurements': [],
'tags': [],
'metadata': {}
'metadata': {},

# Manually add the file orign to the packfile metadata.
# This is set by upload.process_upload on each file, but we're not storing those.
'origin': self.origin
}

# Get or create a session based on the hierarchy and provided labels.
Expand Down
9 changes: 9 additions & 0 deletions api/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from . import util

# Origin represents the different methods a request can be authenticated.
Origin = util.Enum('Origin', {
'user': 'user', # An authenticated user
'device': 'device', # A connected device (reaper, script, etc)
'job': 'job', # Made on behalf of a job (downloading data, uploading results, etc)
'unknown': 'unknown', # Other or public
})
10 changes: 7 additions & 3 deletions api/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
'packfile' : pl.PackfilePlacer # Upload N files as a new packfile to a container.
})

def process_upload(request, strategy, container_type=None, id=None):
def process_upload(request, strategy, container_type=None, id=None, origin=None):
"""
Universal file upload entrypoint.
Expand Down Expand Up @@ -76,7 +76,7 @@ def process_upload(request, strategy, container_type=None, id=None):
metadata = json.loads(form['metadata'].file.getvalue())

placer_class = strategy.value
placer = placer_class(container_type, container, id, metadata, timestamp)
placer = placer_class(container_type, container, id, metadata, timestamp, origin)
placer.check()

# Browsers, when sending a multipart upload, will send files with field name "file" (if sinuglar)
Expand Down Expand Up @@ -108,6 +108,7 @@ def process_upload(request, strategy, container_type=None, id=None):
'modified': field.modified, #
'size': field.size,
'hash': field.hash,
'origin': origin,

'type': None,
'instrument': None,
Expand Down Expand Up @@ -141,7 +142,8 @@ def reaper(self):
hash=file_store.hash,
mimetype=util.guess_mimetype(file_store.filename),
tags=file_store.tags,
metadata=file_store.metadata.get('file', {}).get('metadata', {})
metadata=file_store.metadata.get('file', {}).get('metadata', {}),
origin=self.origin
)

target, file_metadata = reaperutil.create_container_hierarchy(file_store.metadata)
Expand Down Expand Up @@ -179,6 +181,7 @@ def uploader(self):
for target, file_dict in target_containers:
for filename, parsed_file in file_dict.items():
fileinfo = parsed_file.info
fileinfo['origin'] = self.origin
f = target.find(filename)
target_path = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash']))
if not f:
Expand Down Expand Up @@ -248,6 +251,7 @@ def engine(self):
fileinfo['mimetype'] = fileinfo.get('mimetype') or util.guess_mimetype(name)
fileinfo['created'] = now
fileinfo['modified'] = now
fileinfo['origin'] = self.origin
acquisition_obj = reaperutil.add_fileinfo('acquisitions', acquisition_obj['_id'], fileinfo)

for f in acquisition_obj['files']:
Expand Down
14 changes: 14 additions & 0 deletions api/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,17 @@ class Enum(baseEnum.Enum):
# This overrides that behaviour and removes the prefix.
def __str__(self):
return str(self.name)

# Allow equality comparison with strings against the enum's name.

def __ne__(self, other):
if isinstance(other, basestring):
return self.name != other
else:
return super.__ne__(other)

def __eq__(self, other):
if isinstance(other, basestring):
return self.name == other
else:
return super.__eq__(other)

0 comments on commit d5a502b

Please sign in to comment.