Skip to content

Commit

Permalink
fix: [analysers] apps should now work on native log of macos
Browse files Browse the repository at this point in the history
  • Loading branch information
cvandeplas committed Jun 12, 2024
1 parent d99ade0 commit 5476275
Showing 1 changed file with 31 additions and 48 deletions.
79 changes: 31 additions & 48 deletions analysers/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,63 +76,46 @@ def analyse_path(case_folder: str, output_file: str = 'apps.json') -> bool:
# list files in here
for file_in_logarchive_dir in os.listdir(file_in_dir):
file_in_logarchive_dir = os.path.join(file_in_dir, file_in_logarchive_dir)
# logarchive/logarchive.json is a multiline json - generated by native unifiedlog parser
if file_in_logarchive_dir.endswith('logarchive.json'): # apple unified log format
print(f"Found apple logarchive.json: {file_in_logarchive_dir}")
# TODO fix the parser to ensure the same result is given for native and non-native unifiedlog parser ? or just catch it this way
# try something simple
app_list = []
with open(file_in_logarchive_dir, 'rb') as f:
for entry in ijson.items(f, 'data.item'):
if 'subsystem' in entry:
if entry['subsystem'] not in app_list and '.' in entry['subsystem']:
if entry['subsystem'].startswith('pid/'):
pass
elif entry['subsystem'].startswith('user/'):
pass
else:
app_list.append(entry['subsystem'])
if entry['subsystem'] not in apps:
apps[entry['subsystem']] = {'found': ['logarchive']}
else:
apps[entry['subsystem']]['found'].append('logarchive')
else:
# mandiant unifiedlog parser is multiline json format
print(f"Found non-native logarchive file: {file_in_logarchive_dir}")
with open(file_in_logarchive_dir, 'r') as f:
for line in f: # jsonl format
# same parsing for native and mandiant unifiedlog parser, they are in multiline json format
print(f"Found logarchive file: {file_in_logarchive_dir}")
with open(file_in_logarchive_dir, 'r') as f:
for line in f: # jsonl format
try:
entry = json.loads(line)
# skip empty entries
if entry['subsystem'] == '':
continue
# extract app/bundle id or process name from the subsystem field
if not re.search(r'^' + re_bundle_id_pattern + r'$', entry['subsystem']):
# extract foo.bar.hello from the substing if it is in that format
matches = re.findall(re_bundle_id_pattern, entry['subsystem'])
except KeyError: # last line of the native logarchive.json file
continue
except json.decoder.JSONDecodeError: # last lines of the native logarchive.json file
continue
# extract app/bundle id or process name from the subsystem field
if not re.search(r'^' + re_bundle_id_pattern + r'$', entry['subsystem']):
# extract foo.bar.hello from the substing if it is in that format
matches = re.findall(re_bundle_id_pattern, entry['subsystem'])
if matches:
new_term = matches[0][0]
else:
# below are not really apps...more processes.
# TODO decide if we want to keep them or not.
matches = re.findall(r'\[([a-zA-Z0-9-_]+)\]', entry['subsystem'])
if matches:
new_term = matches[0][0]
new_term = matches[0]
else:
# below are not really apps...more processes.
# TODO decide if we want to keep them or not.
matches = re.findall(r'\[([a-zA-Z0-9-_]+)\]', entry['subsystem'])
matches = re.findall(r'^([a-zA-Z0-9-_]+)$', entry['subsystem'])
if matches:
new_term = matches[0]
else:
matches = re.findall(r'^([a-zA-Z0-9-_]+)$', entry['subsystem'])
if matches:
new_term = matches[0]
else:
# print(f"Skipping entry: {entry['subsystem']}")
continue
# print(f"New entry: {new_term} - was: {entry['subsystem']}")
entry['subsystem'] = new_term
# add it to the list
if entry['subsystem'] not in apps:
apps[entry['subsystem']] = {'found': ['logarchive']}
else:
if 'logarchive' not in apps[entry['subsystem']]['found']:
apps[entry['subsystem']]['found'].append('logarchive')
# logarchive/*.json are separate json files - generated by non-native unifiedlog parser
# print(f"Skipping entry: {entry['subsystem']}")
continue
# print(f"New entry: {new_term} - was: {entry['subsystem']}")
entry['subsystem'] = new_term
# add it to the list
if entry['subsystem'] not in apps:
apps[entry['subsystem']] = {'found': ['logarchive']}
else:
if 'logarchive' not in apps[entry['subsystem']]['found']:
apps[entry['subsystem']]['found'].append('logarchive')

with open(output_file, 'w') as f:
f.write(json.dumps(apps, indent=4))
Expand Down

0 comments on commit 5476275

Please sign in to comment.