Skip to content

Commit

Permalink
Merge pull request #81 from AbdouSeck/remove_play_medium
Browse files Browse the repository at this point in the history
Remove event_struct.play_medium from event record
  • Loading branch information
ichuang authored May 27, 2020
2 parents 53a140a + 73da64e commit 8a1efef
Showing 1 changed file with 80 additions and 59 deletions.
139 changes: 80 additions & 59 deletions edx2bigquery/rephrase_tracking_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# rephrase tracking log entries so they can be loaded into BigQuery
#
# The issue is that data have to be in a fixed schema. However, the tracking logs
# have many free-format fields, like "event".
# have many free-format fields, like "event".
#
# We get around this by storing such fields as strings, but keep performance
# potential high by including parsed versions of "event" for specific kinds
Expand All @@ -23,45 +23,59 @@
# - change "_id" : { "$oid" : "5330f5e9e714bdeae575db5e" } to "mongoid": "5330f5e9e714bdeae575db5e"
#
# if module_id or course_id is missing, add them.

import datetime
import gzip
import json
import os
import sys
import json
import gzip
import string
import datetime
import traceback
from math import isnan
from path import Path as path

from addmoduleid import add_module_id
from check_schema_tracking_log import check_schema

def do_rephrase(data, do_schema_check=True, linecnt=0):

def do_rephrase(data, do_schema_check=True, linecnt=0):
"""
Modify the provided data dictionary in place to rephrase
certain pieces of data for easy loading to BigQuery
:TODO: Move the inner functions outside this function.
:type: dict
:param data: A tracking log record from the edX nightly data files
:type: bool
:param do_schema_check: Whether or not the provided record should be checked
against the target schema
:type: int
:param linecnt: Some line count value
:rtype: None
:return: Nothing is returned since the data parameter is modified in place
"""
# add course_id?
if 'course_id' not in data:
cid = data.get('context',{}).get('course_id','')
cid = data.get('context', {}).get('course_id', '')
if cid:
data['course_id'] = cid

# add module_id?
if 'module_id' not in data:
add_module_id(data)

if not "event" in data:
data['evant'] = ""
data['event'] = ""

# ensure event is dict when possible
if not 'event_js' in data:
event = data.get('event')
try:
if not type(event)==dict:
if not isinstance(event, dict):
event = json.loads(event)
event_js = True
except Exception as err:
# note - do not erase event even if it can't be loaded as JSON: see how it becomes JSONified below
event_js = False

data['event'] = event
data['event_js'] = event_js

Expand All @@ -75,21 +89,20 @@ def do_rephrase(data, do_schema_check=True, linecnt=0):
# "event", e.g. for problem_* event types.
#
# store that parsed version as "event_struct"

event = None
if 'event' in data:
event = data['event']
data['event'] = json.dumps(data['event'])

# now the real rephrasing

event_type = data.get('event_type', None)
event_type = data.get('event_type', '')

#----------------------------------------
# types to keep

KNOWN_TYPES = ['play_video', 'seq_goto', 'seq_next', 'seq_prev',
'seek_video', 'load_video',
KNOWN_TYPES = ['play_video', 'seq_goto', 'seq_next', 'seq_prev',
'seek_video', 'load_video',
'save_problem_success',
'save_problem_fail',
'reset_problem_success',
Expand All @@ -100,21 +113,22 @@ def do_rephrase(data, do_schema_check=True, linecnt=0):
'edx.course.enrollment.mode_changed',
'edx.course.enrollment.upgrade.succeeded',
'speed_change_video',
'problem_check',
'problem_save',
'problem_check',
'problem_save',
'problem_reset'
]

if (type(event)==dict and (('problem_' in event_type)
or event_type in KNOWN_TYPES)
and not ('video_embedded' in event_type
or 'harvardx.button' in event_type
or 'harvardx.' in event_type
)):
data['event_struct'] = event
elif type(event)==dict: # default to always including GET and POST when available
data['event_struct'] = {'GET': json.dumps(event.get('GET')), 'POST': json.dumps(event.get('POST'))}
data['event_struct']['query'] = event.get('query')
if isinstance(event, dict):
outs = ('video_embedded', 'harvardx.button', 'harvardx.')
out_conds = not any(k in event_type for k in outs)
in_conds = 'problem_' in event_type or event_type in KNOWN_TYPES
if in_conds and out_conds:
data['event_struct'] = event
else:
data['event_struct'] = {
'GET': json.dumps(event.get('GET')),
'POST': json.dumps(event.get('POST')),
'query': event.get('query'),
}
else:
if 'event_struct' in data:
data.pop('event_struct')
Expand All @@ -126,24 +140,24 @@ def do_rephrase(data, do_schema_check=True, linecnt=0):
data['mongoid'] = data['_id']['$oid']
data.pop('_id')

if type(event)==dict and 'POST' in event:
if isinstance(event, dict) and 'POST' in event:
post_str = json.dumps(event['POST'])
event['POST'] = post_str

if type(event)==dict and 'GET' in event:
if isinstance(event, dict) and 'GET' in event:
get_str = json.dumps(event['GET'])
event['GET'] = get_str

if event_type in ['problem_check', 'problem_save', 'problem_reset'] and data['event_source']=='browser':
if type(event) in [str, unicode]:
if isinstance(event, (str, unicode)):
event = {'data': json.dumps(event)}

if type(event) in [str, unicode]:
if isinstance(event, (str, unicode)):
#if event and data['event_js']:
# sys.stderr.write('unexpected STRING event: ' + json.dumps(data, indent=4) + '\n')
event = {'data': json.dumps(event)}

if type(event) in [list]:
if isinstance(event, (list,)):
event = {'data': json.dumps(event)}

def make_str(key):
Expand Down Expand Up @@ -182,7 +196,7 @@ def move_unknown_fields_from_context_to_context_agent(keys): # needed to handle
agent = {'oldagent': context.get('agent', "")}
for key in keys:
if '.' in key:
(prefix, subkey) = key.split('.',1)
(prefix, subkey) = key.split('.', 1)
if prefix in context:
subcontext = context[prefix]
if subkey in subcontext:
Expand All @@ -197,12 +211,12 @@ def move_unknown_fields_from_context_to_context_agent(keys): # needed to handle
# 31-Jan-15: handle new "module.usage_key" field in context, e.g.:
#
# "module": {
# "display_name": "Radiation Exposure",
# "display_name": "Radiation Exposure",
# "usage_key": "i4x://MITx/6.00.1x_5/problem/ps03:ps03-Radiation-Exposure"
# },
# },
# 28-May-16: context.asides

mobile_api_context_fields = ['application', 'client', 'received_at', 'component', "open_in_browser_url",
mobile_api_context_fields = ['application', 'client', 'received_at', 'component', "open_in_browser_url",
"module.usage_key",
"module.original_usage_version",
"module.original_usage_key",
Expand All @@ -211,7 +225,7 @@ def move_unknown_fields_from_context_to_context_agent(keys): # needed to handle
move_unknown_fields_from_context_to_context_agent(mobile_api_context_fields)

#----------------------------------------
# new fields which are not in schema get moved as JSON strings to the pre-existing "mongoid" field,
# new fields which are not in schema get moved as JSON strings to the pre-existing "mongoid" field,
# which is unused except in very old records
# do this, for example, for the "referer" and "accept_language" fields

Expand All @@ -221,7 +235,7 @@ def move_fields_to_mongoid(field_paths):
Move that field, with the path intact, into the mongoid field.
'''
mongoid = data.get('mongoid')
if not type(mongoid)==dict:
if not isinstance(mongoid, dict):
mongoid = {'old_mongoid' : mongoid}

def move_field_value(ddict, vdict, fp):
Expand All @@ -234,16 +248,13 @@ def move_field_value(ddict, vdict, fp):
ddict.pop(key) # remove key from current path within data dict
return fval
return None

if not key in vdict:
vdict[key] = {}

return move_field_value(ddict.get(key, {}), vdict[key], fp[1:])

vdict = mongoid
for field_path in field_paths:
move_field_value(data, vdict, field_path)

data['mongoid'] = json.dumps(vdict)

# 16-Mar-15: remove event_struct.requested_skip_interval
Expand All @@ -267,6 +278,7 @@ def move_field_value(ddict, vdict, fp):
['environment'], # 06sep2017 rp
['minion_id'], # 06sep2017 rp
['event_struct', 'duration'], # 22nov2017 ic
['event_struct', 'play_medium']
])

#----------------------------------------
Expand All @@ -284,8 +296,8 @@ def fix_dash(key):
def check_empty(data, *keys):
# print "--> keys=%s, data=%s" % (str(keys), data)
key = keys[0]
if type(data)==dict and key in data:
if len(keys)==1:
if isinstance(data, dict) and key in data:
if len(keys) == 1:
if data[key] in ["", u'']:
# print "---> popped %s" % key
data.pop(key)
Expand All @@ -308,42 +320,51 @@ def string_is_float(s):
except ValueError:
return False

if data.get('event_type')=='speed_change_video':
if data.get('event_type') == 'speed_change_video':
if 'event_struct' in data and 'new_speed' in data['event_struct']:
# First check if string is float
if string_is_float(data['event_struct']['new_speed']):
# Second check if value is null
if isnan(float(data['event_struct']['new_speed'])):
data['event_struct'].pop('new_speed')


# check for any funny keys, recursively
funny_key_sections = []
def check_for_funny_keys(entry, name='toplevel'):
for key, val in entry.iteritems():
if key.startswith('i4x-') or key.startswith('xblock.'):
sys.stderr.write("[rephrase] oops, funny key at %s in entry: %s, data=%s\n" % (name, entry, ''))
sys.stderr.write(
"[rephrase] oops, funny key at %s in entry: %s, data=%s\n" % (name, entry, '')
)
funny_key_sections.append(name)
return True
if len(key)>25:
sys.stderr.write("[rephrase] suspicious key at %s in entry: %s, data=%s\n" % (name, entry, ''))

if key[0] in '0123456789':
sys.stderr.write("[rephrase] oops, funny key at %s in entry: %s, data=%s\n" % (name, entry, ''))
if len(key) > 25:
sys.stderr.write(
"[rephrase] suspicious key at %s in entry: %s, data=%s\n" % (name, entry, '')
)

if key[0].isdigit():
sys.stderr.write(
"[rephrase] oops, funny key at %s in entry: %s, data=%s\n" % (name, entry, '')
)
funny_key_sections.append(name)
return True

if '-' in key or '.' in key:
# bad key name! rename it, chaning "-" to "_"
newkey = key.replace('-','_').replace('.','__')
sys.stderr.write("[rephrase] oops, bad keyname at %s in entry: %s newkey+%s\n" % (name, entry, newkey))
sys.stderr.write(
"[rephrase] oops, bad keyname at %s in entry: %s newkey+%s\n" % (name, entry, newkey)
)
entry[newkey] = val
entry.pop(key)
key = newkey
if type(val)==dict:
if isinstance(val, dict):
ret = check_for_funny_keys(val, name + '/' + key)
if ret is True:
sys.stderr.write(" coercing section %s to become a string\n" % (name+"/"+key) )
sys.stderr.write(
" coercing section %s to become a string\n" % (name+"/"+key)
)
entry[key] = json.dumps(val)
return False

Expand All @@ -370,7 +391,6 @@ def do_rephrase_line(line, linecnt=0):
sys.stderr.write('[%d] oops, err=%s, bad log line %s\n' % (linecnt, str(err), line))
sys.stderr.write(traceback.format_exc())
return

return json.dumps(data)+'\n'


Expand All @@ -394,10 +414,11 @@ def do_rephrase_file(fn):
ofp.write(newline)

ofp.close()

oldfilename = fn.dirname() / ("old-" + fn.basename())
print " --> Done; renaming %s -> %s" % (fn, oldfilename)
os.rename(fn, oldfilename)
print " --> renaming %s -> %s" % (ofn, fn)
os.rename(ofn, fn)
sys.stdout.flush()

0 comments on commit 8a1efef

Please sign in to comment.