Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed code smells #2

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions sandbox_exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def get_prefixes(self, sdate, edate, pilot=None, message_type=None):
'sdate': sdate.strftime('%Y%m%d%H'),
'edate': edate.strftime('%Y%m%d%H')
}
fp = lambda filenum: (self.output_convention+'_{filenum}').format(filenum=filenum, **fp_params)
sfolder = self.get_folder_prefix(pilot, message_type, sdate)
efolder = self.get_folder_prefix(pilot, message_type, edate)

Expand Down Expand Up @@ -155,8 +154,8 @@ def export_to_file(self, sdate, edate=None, pilot=None, message_type=None,
fp = lambda filenum: (self.output_convention+'_{filenum}').format(filenum=filenum, **fp_params)

if csv and not output_fields:
flattenerMod = load_flattener('{}/{}'.format(pilot, message_type.upper()))
flattener = flattenerMod()
flattener_mod = load_flattener('{}/{}'.format(pilot, message_type.upper()))
flattener = flattener_mod()
else:
flattener=DataFlattener()

Expand Down
9 changes: 4 additions & 5 deletions sandbox_exporter/flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def transform(self, raw_rec, rename_prefix_fields=[], rename_fields=[],
del out[old_f]

out = {k: int(v) if k in int_fields else v for k,v in out.items()}
# out = {k: json.dumps(v) if k in json_string_fields else v for k,v in out.items()}
return out

def add_enhancements(self, rec):
Expand Down Expand Up @@ -93,7 +92,7 @@ def __init__(self, *args, **kwargs):
]
self.rename_fields = [
('metadata_dataType', 'dataType'),
('metadata_recordGeneratedAt', 'metadata_generatedAt'),
('metadata_recordGeneratedAt', 'metadata_generated_at'),
('metadata_recordGeneratedBy', 'metadata_generatedBy')
]
self.int_fields = [
Expand All @@ -103,8 +102,8 @@ def __init__(self, *args, **kwargs):
self.json_string_fields = ['size']

def add_enhancements(self, rec):
metadata_generatedAt = parse_date(rec['metadata_generatedAt'])
rec['metadata_generatedAt'] = metadata_generatedAt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
metadata_generated_at = parse_date(rec['metadata_generated_at'])
rec['metadata_generated_at'] = metadata_generated_at.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
rec['randomNum'] = random.random()
rec['metadata_generatedAt_timeOfDay'] = metadata_generatedAt.hour + metadata_generatedAt.minute/60 + metadata_generatedAt.second/3600
rec['mmetadata_generated_at_timeOfDay'] = metadata_generated_at.hour + metadata_generated_at.minute/60 + metadata_generated_at.second/3600
return rec
22 changes: 11 additions & 11 deletions sandbox_exporter/flattener_thea.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def process(self, raw_rec):
out.update(part2_val_out)
del out['payload_data_partII_SEQUENCE']

if 'coreData_position_long' in out:
coreData_position_long = float(out['coreData_position_long'])/10e6
coreData_position_lat = float(out['coreData_position_lat'])/10e6
out['coreData_position'] = "POINT ({} {})".format(coreData_position_long, coreData_position_lat)
if 'core_data_position_long' in out:
core_data_position_long = float(out['core_data_position_long'])/10e6
core_data_position_lat = float(out['core_data_position_lat'])/10e6
out['core_data_position_long'] = "POINT ({} {})".format(core_data_position_long, core_data_position_lat)

if 'coreData_size_width' in out:
out['coreData_size'] = json.dumps({'width': int(out['coreData_size_width']),
Expand Down Expand Up @@ -104,8 +104,8 @@ class TheaTIMFlattener(CvDataFlattener):
def __init__(self, **kwargs):
super(TheaTIMFlattener, self).__init__(**kwargs)
self.rename_prefix_fields += [
('payload_data_TravelerInformation_dataFrames_TravelerDataFrame_', 'travelerdataframe_'),
('payload_data_TravelerInformation_', 'travelerinformation_'),
('payload_data_Traveler_information_dataFrames_TravelerDataFrame_', 'travelerdataframe_'),
('payload_data_Traveler_information_', '_'),
('travelerdataframe_regions_GeographicalPath_description_path_', 'travelerdataframe_desc_'),
('travelerdataframe_regions_GeographicalPath_', 'travelerdataframe_'),
('_SEQUENCE', '_sequence'),
Expand All @@ -131,20 +131,20 @@ def process(self, raw_rec):
out = super(TheaTIMFlattener, self).process(raw_rec)

if 'travelerdataframe_msgId_lat' in out:
travelerdataframe_msgId_lat = float(out['travelerdataframe_msgId_lat'])/10e6
travelerdataframe_msgId_long = float(out['travelerdataframe_msgId_long'])/10e6
out['travelerdataframe_msgId_position'] = "POINT ({} {})".format(travelerdataframe_msgId_long, travelerdataframe_msgId_lat)
travelerdataframe_msgid_lat = float(out['travelerdataframe_msgid_lat'])/10e6
travelerdataframe_msgid_long = float(out['travelerdataframe_msgid_long'])/10e6
out['travelerdataframe_msgId_position'] = "POINT ({} {})".format(travelerdataframe_msgid_long, travelerdataframe_msgid_lat)

return out

def process_and_split(self, raw_rec):
out_recs = []
try:
tdfs = copy.deepcopy(raw_rec['payload']['data']['TravelerInformation']['dataFrames']['TravelerDataFrame'])
tdfs = copy.deepcopy(raw_rec['payload']['data']['Traveler_information']['dataFrames']['TravelerDataFrame'])
if type(tdfs) == list:
for tdf in tdfs:
temp_rec = copy.deepcopy(raw_rec)
temp_rec['payload']['data']['TravelerInformation']['dataFrames']['TravelerDataFrame'] = tdf
temp_rec['payload']['data']['Traveler_information']['dataFrames']['TravelerDataFrame'] = tdf
out_recs.append(temp_rec)
else:
out_recs.append(raw_rec)
Expand Down
60 changes: 30 additions & 30 deletions sandbox_exporter/flattener_wydot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, **kwargs):
('payload_data_coreData', 'coreData'),
]
self.rename_fields += [
('metadata_odeReceivedAt', 'metadata_receivedAt'),
('metadata_odeReceivedAt', 'metadata_received_at'),
('payload_dataType', 'dataType'),
('coreData_position_longitude', 'coreData_position_long'),
('coreData_position_latitude', 'coreData_position_lat'),
Expand Down Expand Up @@ -77,8 +77,8 @@ def process(self, raw_rec):
if 'coreData_position_long' in out:
out['coreData_position'] = "POINT ({} {})".format(out['coreData_position_long'], out['coreData_position_lat'])

metadata_receivedAt = dateutil.parser.parse(out['metadata_receivedAt'][:23])
out['metadata_receivedAt'] = metadata_receivedAt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
metadata_received_at = dateutil.parser.parse(out['metadata_received_at'][:23])
out['metadata_received_at'] = metadata_received_at.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]

return out

Expand All @@ -99,8 +99,8 @@ def __init__(self, **kwargs):
self.rename_prefix_fields += [
('metadata_receivedMessageDetails_locationData', 'metadata_rmd'),
('metadata_receivedMessageDetails', 'metadata_rmd'),
('payload_data_MessageFrame_value_TravelerInformation_dataFrames_TravelerDataFrame', 'travelerdataframe'),
('payload_data_MessageFrame_value_TravelerInformation', 'travelerinformation'),
('payload_data_MessageFrame_value_traveler_information_dataFrames_TravelerDataFrame', 'travelerdataframe'),
('payload_data_MessageFrame_value_traveler_information', 'travelerinformation'),
('_SEQUENCE', '_sequence'),
('travelerdataframe_msgId_roadSignID_position', 'travelerdataframe_msgId'),
('travelerdataframe_msgId_roadSignID', 'travelerdataframe_msgId'),
Expand All @@ -111,7 +111,7 @@ def __init__(self, **kwargs):
]

self.rename_fields += [
('metadata_odeReceivedAt', 'metadata_receivedAt'),
('metadata_odeReceivedAt', 'metadata_received_at'),
('payload_dataType', 'dataType'),
('payload_data_MessageFrame_messageId', 'messageId'),
('travelerdataframe_desc_offset_xy_nodes_NodeXY', 'travelerdataframe_desc_nodes')
Expand All @@ -130,57 +130,57 @@ def process(self, raw_rec):
out = super(WydotTIMFlattener, self).process(raw_rec)

if 'travelerdataframe_msgId_lat' in out:
travelerdataframe_msgId_lat = float(out['travelerdataframe_msgId_lat'])/10e6
travelerdataframe_msgId_long = float(out['travelerdataframe_msgId_long'])/10e6
out['travelerdataframe_msgId_position'] = "POINT ({} {})".format(travelerdataframe_msgId_long, travelerdataframe_msgId_lat)
travelerdataframe_msgid_lat = float(out['travelerdataframe_msgid_lat'])/10e6
travelerdataframe_msgid_long = float(out['travelerdataframe_msgid_long'])/10e6
out['travelerdataframe_msgId_position'] = "POINT ({} {})".format(travelerdataframe_msgid_long, travelerdataframe_msgid_lat)

return out

def process_and_split(self, raw_rec):
'''
Turn various Traveler Information DataFrame schemas to one where the Traverler DataFrame is stored at:
rec['payload']['data']['MessageFrame']['value']['TravelerInformation']['dataFrames']['TravelerDataFrame']
rec['payload']['data']['MessageFrame']['value']['traveler_information']['dataFrames']['TravelerDataFrame']

'''
out_recs = []
travelerInformation = copy.deepcopy(raw_rec.get('payload', {}).get('data', {}).get('MessageFrame', {}).get('value', {}).get('TravelerInformation'))
traveler_information = copy.deepcopy(raw_rec.get('payload', {}).get('data', {}).get('MessageFrame', {}).get('value', {}).get('traveler_information'))

if not travelerInformation:
if not traveler_information:
return [self.process(raw_rec)]

if raw_rec['metadata']['schemaVersion'] == 5:
out_recs.append(raw_rec)
else:
# elif raw_rec['metadata']['schemaVersion'] == 6:
travelerDataFrames = travelerInformation.get('dataFrames')
if type(travelerDataFrames) == list:
tdfs = [i.get('TravelerDataFrame') for i in travelerDataFrames if i.get('TravelerDataFrame')]
if len(tdfs) != len(travelerDataFrames):
print('travelerDataFrames discrepancy: {} -> {}'.format(len(travelerDataFrames), len(tdfs)))
elif type(travelerDataFrames) == dict:
travelerDataFramesOpt1 = travelerDataFrames.get('TravelerDataFrame')
travelerDataFramesOpt2 = travelerDataFrames.get('dataFrames', {}).get('TravelerDataFrame')
tdfs = travelerDataFramesOpt1 or travelerDataFramesOpt2
traveler_data_frames = traveler_information.get('dataFrames')
if type(traveler_data_frames) == list:
tdfs = [i.get('TravelerDataFrame') for i in traveler_data_frames if i.get('TravelerDataFrame')]
if len(tdfs) != len(traveler_data_frames):
print('travelerDataFrames discrepancy: {} -> {}'.format(len(traveler_data_frames), len(tdfs)))
elif type(traveler_data_frames) == dict:
traveler_data_frames_opt1 = traveler_data_frames.get('TravelerDataFrame')
traveler_data_frames_opt2 = traveler_data_frames.get('dataFrames', {}).get('TravelerDataFrame')
tdfs = traveler_data_frames_opt1 or traveler_data_frames_opt2
if type(tdfs) != list:
tdfs = [tdfs]
else:
print('No Traveler DataFrame found in this: {}'.format(travelerDataFrames))
print('No Traveler DataFrame found in this: {}'.format(traveler_data_frames))
return [self.process(raw_rec)]

for tdf in tdfs:
GeographicalPath = copy.deepcopy(tdf.get('regions', {}).get('GeographicalPath'))
if type(GeographicalPath) == list:
for path in GeographicalPath:
tdf['regions']['GeographicalPath'] = path
geographical_path = copy.deepcopy(tdf.get('regions', {}).get('geographical_path'))
if type(geographical_path) == list:
for path in geographical_path:
tdf['regions']['geographical_path'] = path

temp_rec = copy.deepcopy(raw_rec)
temp_rec['payload']['data']['MessageFrame']['value']['TravelerInformation']['dataFrames'] = {}
temp_rec['payload']['data']['MessageFrame']['value']['TravelerInformation']['dataFrames']['TravelerDataFrame'] = tdf
temp_rec['payload']['data']['MessageFrame']['value']['traveler_information']['dataFrames'] = {}
temp_rec['payload']['data']['MessageFrame']['value']['traveler_information']['dataFrames']['TravelerDataFrame'] = tdf
out_recs.append(temp_rec)
else:
temp_rec = copy.deepcopy(raw_rec)
temp_rec['payload']['data']['MessageFrame']['value']['TravelerInformation']['dataFrames'] = {}
temp_rec['payload']['data']['MessageFrame']['value']['TravelerInformation']['dataFrames']['TravelerDataFrame'] = tdf
temp_rec['payload']['data']['MessageFrame']['value']['traveler_information']['dataFrames'] = {}
temp_rec['payload']['data']['MessageFrame']['value']['traveler_information']['dataFrames']['TravelerDataFrame'] = tdf
out_recs.append(temp_rec)

return [self.process(out_rec) for out_rec in out_recs if out_rec]
6 changes: 3 additions & 3 deletions sandbox_exporter/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from subprocess import Popen, PIPE


class AWS_helper(object):
class aws_helper(object):
"""
Helper class for connecting to AWS.

"""
def __init__(self, aws_profile=None, logger=False, verbose=False):
"""
Initialization function of the AWS_helper class.
Initialization function of the aws_helper class.

Parameters:
aws_profile: Optional string name of your AWS profile, as set up in
Expand Down Expand Up @@ -61,7 +61,7 @@ def _create_aws_session(self):
return session


class S3Helper(AWS_helper):
class S3Helper(aws_helper):
"""
Helper class for connecting to and working with AWS S3.

Expand Down
35 changes: 17 additions & 18 deletions sandbox_exporter/socrata_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class SocrataDataset(object):
def __init__(self, dataset_id, socrata_client=None, socrata_params={}, float_fields=[]):
def __init__(self, dataset_id, socrata_client=None, socrata_params=none, float_fields=none):
self.dataset_id = dataset_id
self.client = socrata_client
if not socrata_client and socrata_params:
Expand Down Expand Up @@ -74,36 +74,35 @@ def calendar_date(x):
for k,v in rec.items():
if k in float_fields and k in col_dtype_dict:
out[k] = float(v)
elif k in col_dtype_dict:
if v != None and v != '':
elif k in col_dtype_dict and (v != None and v != ''):
out[k] = dtype_func.get(col_dtype_dict.get(k, 'nonexistentKey'), identity)(v)
out = {k:v for k,v in out.items() if k in col_dtype_dict}
return out

def create_new_draft(self):
draftDataset = requests.post('https://{}/api/views/{}/publication.json'.format(self.client.domain, self.dataset_id),
draft_dataset = requests.post('https://{}/api/views/{}/publication.json'.format(self.client.domain, self.dataset_id),
auth=(self.socrata_params['username'], self.socrata_params['password']),
params={'method': 'copySchema'})
logger.info(draftDataset.json())
draftId = draftDataset.json()['id']
return draftId
logger.info(draft_dataset.json())
draft_id = draft_dataset.json()['id']
return draft_id

def publish_draft(self, draftId):
def publish_draft(self, draft_id):
time.sleep(5)
publishResponse = requests.post('https://{}/api/views/{}/publication.json'.format(self.client.domain, draftId),
publish_response = requests.post('https://{}/api/views/{}/publication.json'.format(self.client.domain, draft_id),
auth=(self.socrata_params['username'], self.socrata_params['password']))
logger.info(publishResponse.json())
return publishResponse
logger.info(publish_response.json())
return publish_response

def delete_draft(self, draftId):
def delete_draft(self, draft_id):
time.sleep(5)
deleteResponse = self.client.delete(draftId)
if deleteResponse.status_code == 200:
logger.info('Empty draft {} has been discarded.'.format(draftId))
return deleteResponse
delete_response = self.client.delete(draft_id)
if delete_response.status_code == 200:
logger.info('Empty draft {} has been discarded.'.format(draft_id))
return delete_response

def clean_and_upsert(self, recs, dataset_id=None):
dataset_id = dataset_id or self.dataset_id
out_recs = [self.mod_dtype(r) for r in recs]
uploadResponse = self.client.upsert(dataset_id, out_recs)
return uploadResponse
upload_response = self.client.upsert(dataset_id, out_recs)
return upload_response