From 5476275adf342bc750d337b3bd420f9c3ea0b2d5 Mon Sep 17 00:00:00 2001 From: Christophe Vandeplas Date: Wed, 12 Jun 2024 15:53:17 +0200 Subject: [PATCH] fix: [analysers] apps should now work on native log of macos --- analysers/apps.py | 79 +++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 48 deletions(-) diff --git a/analysers/apps.py b/analysers/apps.py index 1942a59..eca73c6 100644 --- a/analysers/apps.py +++ b/analysers/apps.py @@ -76,63 +76,46 @@ def analyse_path(case_folder: str, output_file: str = 'apps.json') -> bool: # list files in here for file_in_logarchive_dir in os.listdir(file_in_dir): file_in_logarchive_dir = os.path.join(file_in_dir, file_in_logarchive_dir) - # logarchive/logarchive.json is a multiline json - generated by native unifiedlog parser - if file_in_logarchive_dir.endswith('logarchive.json'): # apple unified log format - print(f"Found apple logarchive.json: {file_in_logarchive_dir}") - # TODO fix the parser to ensure the same result is given for native and non-native unifiedlog parser ? or just catch it this way - # try something simple - app_list = [] - with open(file_in_logarchive_dir, 'rb') as f: - for entry in ijson.items(f, 'data.item'): - if 'subsystem' in entry: - if entry['subsystem'] not in app_list and '.' in entry['subsystem']: - if entry['subsystem'].startswith('pid/'): - pass - elif entry['subsystem'].startswith('user/'): - pass - else: - app_list.append(entry['subsystem']) - if entry['subsystem'] not in apps: - apps[entry['subsystem']] = {'found': ['logarchive']} - else: - apps[entry['subsystem']]['found'].append('logarchive') - else: - # mandiant unifiedlog parser is multiline json format - print(f"Found non-native logarchive file: {file_in_logarchive_dir}") - with open(file_in_logarchive_dir, 'r') as f: - for line in f: # jsonl format + # same parsing for native and mandiant unifiedlog parser, they are in multiline json format + print(f"Found logarchive file: {file_in_logarchive_dir}") + with open(file_in_logarchive_dir, 'r') as f: + for line in f: # jsonl format + try: entry = json.loads(line) # skip empty entries if entry['subsystem'] == '': continue - # extract app/bundle id or process name from the subsystem field - if not re.search(r'^' + re_bundle_id_pattern + r'$', entry['subsystem']): - # extract foo.bar.hello from the substing if it is in that format - matches = re.findall(re_bundle_id_pattern, entry['subsystem']) + except KeyError: # last line of the native logarchive.json file + continue + except json.decoder.JSONDecodeError: # last lines of the native logarchive.json file + continue + # extract app/bundle id or process name from the subsystem field + if not re.search(r'^' + re_bundle_id_pattern + r'$', entry['subsystem']): + # extract foo.bar.hello from the substing if it is in that format + matches = re.findall(re_bundle_id_pattern, entry['subsystem']) + if matches: + new_term = matches[0][0] + else: + # below are not really apps...more processes. + # TODO decide if we want to keep them or not. + matches = re.findall(r'\[([a-zA-Z0-9-_]+)\]', entry['subsystem']) if matches: - new_term = matches[0][0] + new_term = matches[0] else: - # below are not really apps...more processes. - # TODO decide if we want to keep them or not. - matches = re.findall(r'\[([a-zA-Z0-9-_]+)\]', entry['subsystem']) + matches = re.findall(r'^([a-zA-Z0-9-_]+)$', entry['subsystem']) if matches: new_term = matches[0] else: - matches = re.findall(r'^([a-zA-Z0-9-_]+)$', entry['subsystem']) - if matches: - new_term = matches[0] - else: - # print(f"Skipping entry: {entry['subsystem']}") - continue - # print(f"New entry: {new_term} - was: {entry['subsystem']}") - entry['subsystem'] = new_term - # add it to the list - if entry['subsystem'] not in apps: - apps[entry['subsystem']] = {'found': ['logarchive']} - else: - if 'logarchive' not in apps[entry['subsystem']]['found']: - apps[entry['subsystem']]['found'].append('logarchive') - # logarchive/*.json are separate json files - generated by non-native unifiedlog parser + # print(f"Skipping entry: {entry['subsystem']}") + continue + # print(f"New entry: {new_term} - was: {entry['subsystem']}") + entry['subsystem'] = new_term + # add it to the list + if entry['subsystem'] not in apps: + apps[entry['subsystem']] = {'found': ['logarchive']} + else: + if 'logarchive' not in apps[entry['subsystem']]['found']: + apps[entry['subsystem']]['found'].append('logarchive') with open(output_file, 'w') as f: f.write(json.dumps(apps, indent=4))