Skip to content

Commit

Permalink
chg: [analysers] migrate apps parser to new model
Browse files Browse the repository at this point in the history
  • Loading branch information
cvandeplas committed Jun 7, 2024
1 parent 0a0e258 commit f5632a7
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 182 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ Create a virtual environment:
sudo apt install graphviz
```

On linux systems you may wish to install the unifiedlogs parser. See below for instructions how to do this.


# Quickstart

Add new sysdiagnose case
Expand Down
5 changes: 2 additions & 3 deletions analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ def analyse(analyser, caseid):

# building command
parse_data_path = "%s/%s/" % (config.parsed_data_folder, caseid)
# TODO consider outputting anlaysers output to a different folder defined in config.py
output_file = os.path.join(config.parsed_data_folder, caseid, analyser + "." + module.analyser_format)
command = "module.%s('%s', '%s')" % (module.analyser_call, parse_data_path, output_file)
result = eval(command)

result = module.analyse_path(case_folder=parse_data_path, output_file=output_file)
print(f'Execution success, output saved in: {output_file}', file=sys.stderr)

return 0
Expand Down
226 changes: 123 additions & 103 deletions analysers/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,127 +3,147 @@
# For Python3
# Author: Emiliern Le Jamtel

"""Apps analyser.
Usage:
apps.py -i <logfolder>
apps.py (-h | --help)
apps.py --version
Options:
-h --help Show this screen.
-v --version Show version.
"""
import os
# from optparse import OptionParser
import json
import ijson
from docopt import docopt
# import glob

version_string = "sysdiagnose-demo-analyser.py v2023-04-28 Version 0.1"
import re

# ----- definition for analyse.py script -----#
# ----- DO NOT DELETE ----#
analyser_description = 'Get list of Apps installed on the device'
analyser_format = 'json'

analyser_description = "Get list of Apps installed on the device"
analyser_call = "apps_analysis"
analyser_format = "md"
uses_parsers = ['accessibility_tcc', 'brctl', 'itunesstore', 'logarchive'] # not used yet, but just interesting to keep track

# --------------------------------------------------------------------------- #
# TODO this code is terribly slow. I would expect this is due to all the if key in lookups. It took 49 seconds for case 1


def apps_analysis(jsondir, filename):
"""
def analyse_path(case_folder: str, output_file: str = 'apps.json') -> bool:
'''
Go through all json files in the folder and generate the markdown file
"""
'''

apps = {}
# TODO add a check to see if the files exist, and if necessary, call the parsers (or ask the user to call them), or maybe using a flag in the function call

for file_in_dir in os.listdir(case_folder):
file_in_dir = os.path.join(case_folder, file_in_dir)

for jsonfile in os.listdir(jsondir):
jsonfile = os.path.join(jsondir, jsonfile)
# building data depending on the source
if jsonfile.endswith('accessibility-tcc.json'):
with open(jsonfile, 'r') as f:
accessibility_data = json.load(f)
for entry in accessibility_data['access']:
if entry['client'] not in apps.keys():
apps[entry['client']] = {"found": ['accessibility-tcc'], "services": [entry['service']]}
if file_in_dir.endswith('accessibility_tcc.json'):
with open(file_in_dir, 'r') as f:
json_data = json.load(f)
if not json_data or json_data.get('error'):
continue
for entry in json_data['access']:
if entry['client'] not in apps:
apps[entry['client']] = {'found': ['accessibility-tcc'], 'services': [entry['service']]}
else:
apps[entry['client']]["services"].append(entry['service'])
elif jsonfile.endswith('brctl.json'):
with open(jsonfile, 'r') as f:
brctl_data = json.load(f)
try:
apps[entry['client']]['services'].append(entry['service'])
except KeyError:
apps[entry['client']]['services'] = [entry['service']]

elif file_in_dir.endswith('brctl.json'):
with open(file_in_dir, 'r') as f:
json_data = json.load(f)
if not json_data or json_data.get('error'):
continue
# directly going to the list of apps
for entry in brctl_data['app_library_id'].keys():
if entry not in apps.keys():
apps[entry] = {"found": ['brctl'], "libraries": brctl_data['app_library_id'][entry]}
for entry in json_data['app_library_id']:
if entry not in apps:
apps[entry] = {'found': ['brctl'], 'libraries': json_data['app_library_id'][entry]}
else:
apps[entry]["libraries"] = brctl_data['app_library_id'][entry]
apps[entry]["found"].append('brctl')
elif jsonfile.endswith('itunesstore.json'):
with open(jsonfile, 'r') as f:
itunesstore_data = json.load(f)
try:
apps[entry]['libraries'] = json_data['app_library_id'][entry]
except KeyError:
apps[entry['client']]['libraries'] = json_data['app_library_id'][entry]

apps[entry]['found'].append('brctl')

elif file_in_dir.endswith('itunesstore.json'):
with open(file_in_dir, 'r') as f:
json_data = json.load(f)
if not json_data or json_data.get('error'):
continue
# directly going to the list of apps
for entry in itunesstore_data['application_id']:
if entry['bundle_id'] not in apps.keys():
apps[entry['bundle_id']] = {"found": ['itunesstore']}
for entry in json_data['application_id']:
if entry['bundle_id'] not in apps:
apps[entry['bundle_id']] = {'found': ['itunesstore']}
else:
apps[entry['bundle_id']]["found"].append('itunesstore')
elif jsonfile.endswith('logarchive.json'): # FIXME fix the parser to ensure the same result is given for native and non-native unifiedlog parser
# try something simple
app_list = []
with open(jsonfile, 'rb') as f:
for entry in ijson.items(f, 'data.item'):
if 'subsystem' in entry.keys():
if entry['subsystem'] not in app_list and '.' in entry['subsystem']:
if entry['subsystem'].startswith('pid/'):
pass
elif entry['subsystem'].startswith('user/'):
pass
apps[entry['bundle_id']]['found'].append('itunesstore')

if file_in_dir.endswith('logarchive'):
re_bundle_id_pattern = r'(([a-zA-Z0-9-_]+\.)+[a-zA-Z0-9-_]+)'
# list files in here
for file_in_logarchive_dir in os.listdir(file_in_dir):
file_in_logarchive_dir = os.path.join(file_in_dir, file_in_logarchive_dir)
# logarchive/logarchive.json is a ijson multiline json - generated by native unifiedlog parser
if file_in_logarchive_dir.endswith('logarchive.json'): # apple unified log format
print(f"Found apple logarchive.json: {file_in_logarchive_dir}")
# TODO fix the parser to ensure the same result is given for native and non-native unifiedlog parser ? or just catch it this way
# try something simple
app_list = []
with open(file_in_logarchive_dir, 'rb') as f:
for entry in ijson.items(f, 'data.item'):
if 'subsystem' in entry:
if entry['subsystem'] not in app_list and '.' in entry['subsystem']:
if entry['subsystem'].startswith('pid/'):
pass
elif entry['subsystem'].startswith('user/'):
pass
else:
app_list.append(entry['subsystem'])
if entry['subsystem'] not in apps:
apps[entry['subsystem']] = {'found': ['logarchive']}
else:
apps[entry['subsystem']]['found'].append('logarchive')
elif file_in_logarchive_dir.endswith('dataFoundInMultipleLogFiles.json'):
# FIXME see https://github.com/mandiant/macos-UnifiedLogs/blob/main/examples/unifiedlog_parser_json/src/main.rs#L342
# Long story short:
# - invalid json, ['a']['b'] is not valid json, should be ['a', 'b']
# - or multiline json
# - potential workaround is to make a dirty function to parse, take the offset of the error, extract, parse again, and so on
# See also https://github.com/mandiant/macos-UnifiedLogs/issues/16
continue
else:
print(f"Non-native file in logarchive folder: {file_in_logarchive_dir}")
with open(file_in_logarchive_dir, 'r') as f:
json_data = json.load(f)
# directly going to the list of apps
for entry in json_data:
# skip empty entries
if entry['subsystem'] == '':
continue
# extract app/bundle id or process name from the subsystem field
if not re.search(r'^' + re_bundle_id_pattern + r'$', entry['subsystem']):
# extract foo.bar.hello from the substing if it is in that format
matches = re.findall(re_bundle_id_pattern, entry['subsystem'])
if matches:
new_term = matches[0][0]
else:
app_list.append(entry['subsystem'])
if entry['subsystem'] not in apps.keys():
apps[entry['subsystem']] = {"found": ['logarchive']}
# below are not really apps...more processes.
# TODO decide if we want to keep them or not.
matches = re.findall(r'\[([a-zA-Z0-9-_]+)\]', entry['subsystem'])
if matches:
new_term = matches[0]
else:
apps[entry['subsystem']]["found"].append('logarchive')
print(json.dumps(apps, indent=4))

matches = re.findall(r'^([a-zA-Z0-9-_]+)$', entry['subsystem'])
if matches:
new_term = matches[0]
else:
# print(f"Skipping entry: {entry['subsystem']}")
continue
# print(f"New entry: {new_term} - was: {entry['subsystem']}")
entry['subsystem'] = new_term
# add it to the list
if entry['subsystem'] not in apps:
apps[entry['subsystem']] = {'found': ['logarchive']}
else:
if 'logarchive' not in apps[entry['subsystem']]['found']:
apps[entry['subsystem']]['found'].append('logarchive')
# logarchive/*.json are separate json files - generated by non-native unifiedlog parser

with open(output_file, 'w') as f:
f.write(json.dumps(apps, indent=4))
print(f"Apps list written to {output_file}")
# print(json.dumps(apps, indent=4))
return


# --------------------------------------------------------------------------- #
"""
Main function
"""


def main():
"""
Main function, to be called when used as CLI tool
"""

arguments = docopt(__doc__, version='parser for com.apple.wifi plist files v0.2')

if arguments['-i']:
# go through the json files in the folder
json_files = []
for file in os.listdir(arguments['<logfolder>']):
if file.endswith(".json"):
json_files.append(os.path.join(arguments['<logfolder>'], file))
# call the function to generate the apps analysis
apps_analysis(json_files, 'tmp.md')

# --------------------------------------------------------------------------- #


"""
Call main function
"""

if __name__ == "__main__":

# Create an instance of the Analysis class (called "base") and run main
main()

# That's all folks ;)
65 changes: 3 additions & 62 deletions analysers/demo_analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,19 @@

# For Python3
# DEMO - Skeleton
# Author: [email protected]

import os
import sys
import json
from datetime import datetime
from optparse import OptionParser

version_string = "sysdiagnose-demo-analyser.py v2023-04-28 Version 0.1"

# ----- definition for analyse.py script -----#
# ----- DO NOT DELETE ----#

analyser_description = "Do something useful (DEMO)"
analyser_call = "generate_something"
analyser_format = "json"


def generate_something(jsondir, filename):
def analyse_path(case_folder: str, output_file: str = "demo-analyser.json") -> bool:
"""
Generate the timeline and save it to filename
"""
print("DO SOMETHING HERE")
with open(output_file, 'w') as f:
json.dump({"Hello": "World"}, f, indent=4)
return


# --------------------------------------------------------------------------- #
"""
Main function
"""
def main():

if sys.version_info[0] < 3:
print("Must be using Python 3! Exiting ...")
exit(-1)

print(f"Running {version_string}\n")

usage = "\n%prog -d JSON directory\n"

parser = OptionParser(usage=usage)
parser.add_option("-d", dest="inputdir",
action="store", type="string",
help="Directory containing JSON from parsers")
parser.add_option("-o", dest="outputfile",
action="store", type="string",
help="JSON file to save output")
(options, args) = parser.parse_args()

# no arguments given by user, print help and exit
if len(sys.argv) == 1:
parser.print_help()
sys.exit(-1)

if options.inputdir:
# do something
if options.outputfile:
print("Hello World")
else:
print("Something else")
else:
print("WARNING -i option is mandatory!")


# --------------------------------------------------------------------------- #

"""
Call main function
"""
if __name__ == "__main__":

# Create an instance of the Analysis class (called "base") and run main
main()

# That's all folks ;)
8 changes: 8 additions & 0 deletions parsers/accessibility_tcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import glob
import os
import utils.misc as misc
import json

version_string = "sysdiagnose-Accessibility-TCC.py v2020-20-20 Version 1.0"

Expand All @@ -33,3 +34,10 @@ def parse_path(path: str) -> list | dict:
return misc.json_serializable(sqlite2json.sqlite2struct(get_log_files(path)[0]))
except IndexError:
return {'error': 'No TCC.db file found in logs/Accessibility/ directory'}


def parse_path_to_folder(path: str, output_folder: str) -> bool:
result = parse_path(path)
output_file = os.path.join(output_folder, f"{__name__.split('.')[-1]}.json")
with open(output_file, 'w') as f:
json.dump(result, f, indent=4)
7 changes: 7 additions & 0 deletions parsers/brctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ def parse_path(path: str) -> list | dict:
return {'error': 'No brctl folder found'}


def parse_path_to_folder(path: str, output_folder: str) -> bool:
result = parse_path(path)
output_file = os.path.join(output_folder, f"{__name__.split('.')[-1]}.json")
with open(output_file, 'w') as f:
json.dump(result, f, indent=4)


def parselistfile(container_list_file):
containers = {"containers": []}
result = []
Expand Down
Loading

0 comments on commit f5632a7

Please sign in to comment.