diff --git a/parsers/containermanager.py b/parsers/containermanager.py index b8a86d2..94bf978 100644 --- a/parsers/containermanager.py +++ b/parsers/containermanager.py @@ -20,7 +20,7 @@ import glob import json import os -import re +from utils import multilinelog # ----- definition for parsing.py script -----# @@ -43,91 +43,9 @@ def get_log_files(log_root_path: str) -> list: return log_files -# function copied from https://github.com/abrignoni/iOS-Mobile-Installation-Logs-Parser/blob/master/mib_parser.sql.py -# Month to numeric with leading zero when month < 10 function -# Function call: month = month_converter(month) - - -def month_converter(month): - months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] - month = months.index(month) + 1 - if (month < 10): - month = f"{month:02d}" - return month - -# Day with leading zero if day < 10 function -# Functtion call: day = day_converter(day) - - -def day_converter(day): - day = int(day) - if (day < 10): - day = f"{day:02d}" - return day -## - - def parsecontainermanager(loglist): - events = {"events": []} for logfile in loglist: - with open(logfile, 'r', encoding="utf-8") as f: - # multiline parsing with the following logic: - # - build an entry with the seen lines - # - upon discovery of a new entry, or the end of the file, consider the entry as complete and process the lines - # - discovery of a new entry is done based on the timestamp, as each new entry starts this way - prev_lines = [] - for line in f: - timeregex = re.search(r"(?<=^)(.*?)(?= \[[0-9]+)", line) # Regex for timestamp - if timeregex: - # new entry, process the previous entry - if prev_lines: - new_entry = buildlogentry(''.join(prev_lines)) - events['events'].append(new_entry) - # build the new entry - prev_lines = [] - prev_lines.append(line) - else: - # not a new entry, add the line to the previous entry - prev_lines.append(line) - # process the last entry - new_entry = buildlogentry(''.join(prev_lines)) - events['events'].append(new_entry) - return events - - -def buildlogentry(line): - entry = {} - # timestamp - timeregex = re.search(r"(?<=^)(.*?)(?= \[[0-9]+)", line) # Regex for timestamp - if timeregex: - timestamp = timeregex.group(1) - weekday, month, day, time, year = (str.split(timestamp[:24])) - day = day_converter(day) - month = month_converter(month) - entry['timestamp'] = str(year) + '-' + str(month) + '-' + str(day) + ' ' + str(time) - - # log level - loglevelregex = re.search(r"\<(.*?)\>", line) - entry['loglevel'] = loglevelregex.group(1) - - # hex_ID - hexIDregex = re.search(r"\(0x(.*?)\)", line) - entry['hexID'] = '0x' + hexIDregex.group(1) - - # event_type - eventyperegex = re.search(r"\-\[(.*)(\]\:)", line) - if eventyperegex: - entry['event_type'] = eventyperegex.group(1) - - # msg - if 'event_type' in entry: - msgregex = re.search(r"\]\:(.*)", line, re.MULTILINE | re.DOTALL) - entry['msg'] = msgregex.group(1).strip() - else: - msgregex = re.search(r"\)\ (.*)", line, re.MULTILINE | re.DOTALL) - entry['msg'] = msgregex.group(1).strip() - - return entry + return multilinelog.extract_from_file(logfile) def main(): diff --git a/parsers/mobileactivation.py b/parsers/mobileactivation.py index be4fdcc..043a272 100644 --- a/parsers/mobileactivation.py +++ b/parsers/mobileactivation.py @@ -19,9 +19,8 @@ from docopt import docopt import glob import json -import misc import os -import re +from utils import multilinelog # ----- definition for parsing.py script -----# # ----- DO NOT DELETE ----# @@ -44,130 +43,9 @@ def get_log_files(log_root_path: str) -> list: return log_files -# function copied from https://github.com/abrignoni/iOS-Mobile-Installation-Logs-Parser/blob/master/mib_parser.sql.py -# Month to numeric with leading zero when month < 10 function -# Function call: month = month_converter(month) - - -def month_converter(month): - months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] - month = months.index(month) + 1 - if (month < 10): - month = f"{month:02d}" - return month - -# Day with leading zero if day < 10 function -# Functtion call: day = day_converter(day) - - -def day_converter(day): - day = int(day) - if (day < 10): - day = f"{day:02d}" - return day -## - - def parsemobactiv(loglist): - events = {"events": []} for logfile in loglist: - with open(logfile, 'r', encoding='utf8') as f: - status = None # status tracker for multiline parsing - for line in f: - # Activation multiline parsing - if not status and "____________________ Mobile Activation Startup _____________________" in line: - status = 'act_start' - act_lines = [] - elif status == 'act_start' and "____________________________________________________________________" in line: - status = None - events['events'].append(buildlogentry_actentry(act_lines)) - elif status == 'act_start': - act_lines.append(line.strip()) - # plist multiline parsing - elif line.strip().endswith(":"): # next line will be starting with ': # end of plist - status = None - # end of plist, now need to parse the line and plist - event = buildlogentry_other(plist_lines['line']) - event['plist'] = misc.load_plist_string_as_json(b''.join(plist_lines['plist'])) - # LATER parse the plist - # - extract the recursive plist - # - decode the certificates into nice JSON - # - and so on with more fun for the future - events['events'].append(event) - elif line.strip() != '': - events['events'].append(buildlogentry_other(line.strip())) - # print(json.dumps(events,indent=4)) - return events - - -def buildlogentry_actentry(lines): - # print(lines) - event = {'loglevel': 'debug'} - # get timestamp - timeregex = re.search(r"(?<=^)(.*?)(?= \[)", lines[0]) - timestamp = timeregex.group(1) - weekday, month, day, time, year = (str.split(timestamp)) - day = day_converter(day) - month = month_converter(month) - event['timestamp'] = str(year) + '-' + str(month) + '-' + str(day) + ' ' + str(time) - - # hex_ID - hexIDregex = re.search(r"\(0x(.*?)\)", lines[0]) - event['hexID'] = '0x' + hexIDregex.group(1) - - # build event - for line in lines: - splitted = line.split(":") - if len(splitted) > 1: - event[splitted[-2].strip()] = splitted[-1].strip() - - return event - - -def buildlogentry_other(line): - event = {} - try: - # get timestamp - timeregex = re.search(r"(?<=^)(.*?)(?= \[)", line) - timestamp = timeregex.group(1) - weekday, month, day, time, year = (str.split(timestamp)) - day = day_converter(day) - month = month_converter(month) - event['timestamp'] = str(year) + '-' + str(month) + '-' + str(day) + ' ' + str(time) - - # log level - loglevelregex = re.search(r"\<(.*?)\>", line) - event['loglevel'] = loglevelregex.group(1) - - # hex_ID - hexIDregex = re.search(r"\(0x(.*?)\)", line) - event['hexID'] = '0x' + hexIDregex.group(1) - - # event_type - eventyperegex = re.search(r"\-\[(.*)(\]\:)", line) - if eventyperegex: - event['event_type'] = eventyperegex.group(1) - - # msg - if 'event_type' in event: - msgregex = re.search(r"\]\:(.*)", line) - event['msg'] = msgregex.group(1).strip() - else: - msgregex = re.search(r"\)\ (.*)", line) - event['msg'] = msgregex.group(1).strip() - except Exception as e: - print(f"Error parsing line: {line}. Reason: {str(e)}") - raise Exception from e - - return event + return multilinelog.extract_from_file(logfile) def main(): diff --git a/parsers/mobileinstallation.py b/parsers/mobileinstallation.py index 3e52f89..2837ce3 100644 --- a/parsers/mobileinstallation.py +++ b/parsers/mobileinstallation.py @@ -20,8 +20,7 @@ import glob import json import os -import re - +from utils import multilinelog # ----- definition for parsing.py script -----# # ----- DO NOT DELETE ----# @@ -44,95 +43,13 @@ def get_log_files(log_root_path: str) -> list: return log_files -# function copied from https://github.com/abrignoni/iOS-Mobile-Installation-Logs-Parser/blob/master/mib_parser.sql.py -# Month to numeric with leading zero when month < 10 function -# Function call: month = month_converter(month) - - -def month_converter(month): - months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] - month = months.index(month) + 1 - if (month < 10): - month = f"{month:02d}" - return month - -# Day with leading zero if day < 10 function -# Functtion call: day = day_converter(day) - - -def day_converter(day): - day = int(day) - if (day < 10): - day = f"{day:02d}" - return day -## - - def parsemobinstall(loglist): events = {"events": []} for logfile in loglist: - with open(logfile, 'r', encoding='utf8') as f: - prev_lines = [] - for line in f: - line = line.strip() - # support multiline entries - if line.endswith('{'): - prev_lines.append(line) - continue - if prev_lines: - prev_lines.append(line) - if line.endswith('}'): - line = '\n'.join(prev_lines) - prev_lines = [] - else: - continue - # normal or previously multiline entry - # getting Timestamp - adding entry only if timestamp is present - timeregex = re.search(r"(?<=^)(.*)(?= \[)", line) # Regex for timestamp - if timeregex: - new_entry = buildlogentry(line) - events['events'].append(new_entry) + return multilinelog.extract_from_file(logfile) return events -def buildlogentry(line): - try: - entry = {} - # timestamp - timeregex = re.search(r"(?<=^)(.*?)(?= \[[0-9]+)", line) # Regex for timestamp - timestamp = timeregex.group(1) - weekday, month, day, time, year = (str.split(timestamp)) - day = day_converter(day) - month = month_converter(month) - entry['timestamp'] = str(year) + '-' + str(month) + '-' + str(day) + ' ' + str(time) - - # log level - loglevelregex = re.search(r"\<(.*?)\>", line) - entry['loglevel'] = loglevelregex.group(1) - - # hex_ID - hexIDregex = re.search(r"\(0x(.*?)\)", line) - entry['hexID'] = '0x' + hexIDregex.group(1) - - # event_type - eventyperegex = re.search(r"\-\[(.*)(\]\:)", line) - if eventyperegex: - entry['event_type'] = eventyperegex.group(1) - - # msg - if 'event_type' in entry: - msgregex = re.search(r"\]\:(.*)", line) - entry['msg'] = msgregex.group(1).strip() - else: - msgregex = re.search(r"\)\ (.*)", line) - entry['msg'] = msgregex.group(1).strip() - except Exception as e: - print(f"Error parsing line: {line}. Reason: {str(e)}") - raise Exception from e - - return entry - - def main(): """ Main function, to be called when used as CLI tool diff --git a/tests/test_multilinelog.py b/tests/test_multilinelog.py new file mode 100644 index 0000000..b493e24 --- /dev/null +++ b/tests/test_multilinelog.py @@ -0,0 +1,193 @@ +from tests import SysdiagnoseTestCase +from utils import multilinelog +import unittest + + +class TestMultiline(SysdiagnoseTestCase): + + def test_multilinelog_plist(self): + s = '''Wed May 24 12:58:04 2023 [173] (0x16bf9b000) MA: -[MobileActivationDaemon handleActivationInfoWithSession:activationSignature:completionBlock:]: Activation message: + + + + +AccountToken + +dGVzdA== + +AccountTokenCertificate + +dGVzdA== + +unbrick + + + + +''' + expected_result = { + 'timestamp': '2023-05-24 12:58:04', + 'loglevel': 'debug', + 'hexID': '0x16bf9b000', + 'event_type': 'MobileActivationDaemon handleActivationInfoWithSession:activationSignature:completionBlock:', + 'msg': 'Activation message:', + 'plist': {'AccountToken': 'test', 'AccountTokenCertificate': 'test', 'unbrick': True}} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result, result['events'][0]) + + pass + + def test_multilinelog_curlybrackets(self): + # LATER parse the bracket as json, but it's a though job: + # - find the first bracket, then the last bracket by counting up and down again, extract the inside. + # - but if the inside is not at the end, the rest of the text still needs to be added somewhere... So not sure what is the best. + s = '''Wed May 24 13:05:36 2023 [72] (0x16be43000) +[MCMMetadata readAndValidateMetadataAtFileUrl:forUserIdentity:containerClass:checkClassPath:transient:error:]: 199: Failed to validate metadata at URL [file:///private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155/.com.apple.mobile_container_manager.metadata.plist]: { + MCMMetadataActiveDPClass = 0; + MCMMetadataContentClass = 2; + MCMMetadataIdentifier = "com.apple.VoiceMemos"; + MCMMetadataInfo = { + "com.apple.MobileInstallation.ContentProtectionClass" = 0; + }; + MCMMetadataSchemaVersion = 1; + MCMMetadataUUID = "12036663-1F3A-45B3-A34C-402D5BB7D4FB"; + MCMMetadataUserIdentity = { + personaUniqueString = "83CB8039-725D-4462-84C2-7F79F0A6EFB3"; + posixGID = 501; + posixUID = 501; + type = 0; + version = 2; + }; + MCMMetadataVersion = 6; +} (Error Domain=MCMErrorDomain Code=29 "Invalid metadata-URLs should match: /private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155 : /private/var/mobile/Containers/Data/VPNPlugin/0984009B-81D1-4F7F-BDBD-261E22059155" UserInfo={SourceFileLine=370, NSLocalizedDescription=Invalid metadata-URLs should match: /private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155 : /private/var/mobile/Containers/Data/VPNPlugin/0984009B-81D1-4F7F-BDBD-261E22059155, FunctionName=+[MCMMetadata _readAndValidateMetadataInDictionary:containerURL:forUserIdentity:containerClass:checkClassPath:fsNode:transient:error:]}) +''' + expected_result = { + 'timestamp': '2023-05-24 13:05:36', + 'loglevel': 'err', + 'hexID': '0x16be43000', + 'msg': '+[MCMMetadata readAndValidateMetadataAtFileUrl:forUserIdentity:containerClass:checkClassPath:transient:error:]: 199: Failed to validate metadata at URL [file:///private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155/.com.apple.mobile_container_manager.metadata.plist]: {\n MCMMetadataActiveDPClass = 0;\n MCMMetadataContentClass = 2;\n MCMMetadataIdentifier = "com.apple.VoiceMemos";\n MCMMetadataInfo = {\n "com.apple.MobileInstallation.ContentProtectionClass" = 0;\n };\n MCMMetadataSchemaVersion = 1;\n MCMMetadataUUID = "12036663-1F3A-45B3-A34C-402D5BB7D4FB";\n MCMMetadataUserIdentity = {\n personaUniqueString = "83CB8039-725D-4462-84C2-7F79F0A6EFB3";\n posixGID = 501;\n posixUID = 501;\n type = 0;\n version = 2;\n };\n MCMMetadataVersion = 6;\n} (Error Domain=MCMErrorDomain Code=29 "Invalid metadata-URLs should match: /private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155 : /private/var/mobile/Containers/Data/VPNPlugin/0984009B-81D1-4F7F-BDBD-261E22059155" UserInfo={SourceFileLine=370, NSLocalizedDescription=Invalid metadata-URLs should match: /private/var/mobile/Containers/Data/Application/0984009B-81D1-4F7F-BDBD-261E22059155 : /private/var/mobile/Containers/Data/VPNPlugin/0984009B-81D1-4F7F-BDBD-261E22059155, FunctionName=+[MCMMetadata _readAndValidateMetadataInDictionary:containerURL:forUserIdentity:containerClass:checkClassPath:fsNode:transient:error:]})'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result, result['events'][0]) + + def test_multilinelog_simple_1(self): + s = '''Wed May 24 12:55:37 2023 [72] (0x16afb3000) -[MCMClientConnection _regenerateAllSystemContainerPaths]: Rolling system container directory UUIDs on disk''' + expected_result = { + 'timestamp': '2023-05-24 12:55:37', + 'loglevel': 'notice', + 'hexID': '0x16afb3000', + 'event_type': 'MCMClientConnection _regenerateAllSystemContainerPaths', + 'msg': 'Rolling system container directory UUIDs on disk'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result, result['events'][0]) + + def test_mutlinelog_simple_2(self): + s = '''Wed May 24 13:05:30 2023 [72] (0x16be43000) _containermanagerd_init_block_invoke: containermanagerd first boot cleanup complete''' + expected_result = { + 'timestamp': '2023-05-24 13:05:30', + 'loglevel': 'notice', + 'hexID': '0x16be43000', + 'msg': '_containermanagerd_init_block_invoke: containermanagerd first boot cleanup complete'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result, result['events'][0]) + + def test_multilinelog_simple_multiplelines(self): + s = '''Wed May 24 13:05:30 2023 [72] (0x16be43000) _containermanagerd_init_block_invoke: containermanagerd first boot cleanup complete +Wed May 24 12:55:37 2023 [72] (0x16afb3000) -[MCMClientConnection _regenerateAllSystemContainerPaths]: Rolling system container directory UUIDs on disk''' + expected_result_0 = { + 'timestamp': '2023-05-24 13:05:30', + 'loglevel': 'notice', + 'hexID': '0x16be43000', + 'msg': '_containermanagerd_init_block_invoke: containermanagerd first boot cleanup complete'} + expected_result_1 = { + 'timestamp': '2023-05-24 12:55:37', + 'loglevel': 'notice', + 'hexID': '0x16afb3000', + 'event_type': 'MCMClientConnection _regenerateAllSystemContainerPaths', + 'msg': 'Rolling system container directory UUIDs on disk'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result_0, result['events'][0]) + self.assertDictEqual(expected_result_1, result['events'][1]) + + def test_mutilinelog_emptylines(self): + s = '''\n\n''' + result = multilinelog.extract_from_string(s) + self.assertEqual(0, len(result['events'])) + + def test_multilinelog_keyvalue(self): + s = '''Wed May 24 12:55:37 2023 [72] (0x16afb3000) -[MCMClientConnection _regenerateAllSystemContainerPaths]: Rolling system container directory UUIDs on disk +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: ____________________ Mobile Activation Startup _____________________ +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: build_version: 19H349 +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: internal_build: false +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: uid: 501 +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: user_name: mobile +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: system_container_path: /private/var/containers/Data/System/4E023926-12C3-401D-BE00-06FC33B50889 +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: regulatory_images_path: /private/var/containers/Shared/SystemGroup/AF534A77-07C2-4140-917E-BEE330B5B1AF +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: hardware_model: D101AP +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: product_type: iPhone9,3 +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: device_class: iPhone +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: has_telephony: true +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: should_hactivate: false +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: is_fpga: false +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: is_devfused_undemoted: false +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: is_prodfused_demoted: false +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: soc_generation: H9 +Wed May 24 13:08:13 2023 [135] (0x16f1db000) MA: main: ____________________________________________________________________''' + expected_result_0 = { + 'timestamp': '2023-05-24 12:55:37', + 'loglevel': 'notice', + 'hexID': '0x16afb3000', + 'event_type': 'MCMClientConnection _regenerateAllSystemContainerPaths', + 'msg': 'Rolling system container directory UUIDs on disk'} + expected_result_1 = { + 'timestamp': '2023-05-24 13:08:13', + 'loglevel': 'debug', + 'hexID': '0x16f1db000', + 'msg': 'MA: main: ____________________ Mobile Activation Startup _____________________', + 'build_version': '19H349', + 'internal_build': 'false', + 'uid': '501', + 'user_name': 'mobile', + 'system_container_path': '/private/var/containers/Data/System/4E023926-12C3-401D-BE00-06FC33B50889', + 'regulatory_images_path': '/private/var/containers/Shared/SystemGroup/AF534A77-07C2-4140-917E-BEE330B5B1AF', + 'hardware_model': 'D101AP', + 'product_type': 'iPhone9,3', + 'device_class': 'iPhone', + 'has_telephony': 'true', + 'should_hactivate': 'false', + 'is_fpga': 'false', + 'is_devfused_undemoted': 'false', + 'is_prodfused_demoted': 'false', + 'soc_generation': 'H9'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result_0, result['events'][0]) + self.assertDictEqual(expected_result_1, result['events'][1]) + + def test_multilinelog_keyvalue_onlyend(self): + s = '''Sat Feb 18 09:48:38 2023 [2695] (0x16dc37000) MA: main: ____________________________________________________________________ +Sat Feb 18 09:48:39 2023 [2695] (0x16dc37000) MA: dealwith_activation: Activation State: Activated''' + expected_result_0 = { + 'timestamp': '2023-02-18 09:48:38', + 'loglevel': 'debug', + 'hexID': '0x16dc37000', + 'msg': 'MA: main: ____________________________________________________________________'} + expected_result_1 = { + 'timestamp': '2023-02-18 09:48:39', + 'loglevel': 'debug', + 'hexID': '0x16dc37000', + 'msg': 'MA: dealwith_activation: Activation State: Activated'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result_0, result['events'][0]) + self.assertDictEqual(expected_result_1, result['events'][1]) + + def test_multilinelog_keyvalue_onlystart(self): + s = '''Fri Dec 2 11:32:19 2022 [84816] (0x16afff000) MA: main: ____________________ Mobile Activation Startup _____________________''' + expected_result = { + 'timestamp': '2022-12-02 11:32:19', + 'loglevel': 'debug', + 'hexID': '0x16afff000', + 'msg': 'MA: main: ____________________ Mobile Activation Startup _____________________'} + result = multilinelog.extract_from_string(s) + self.assertDictEqual(expected_result, result['events'][0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_parsers_mobileactivation.py b/tests/test_parsers_mobileactivation.py index 6f0cb1f..98ee322 100644 --- a/tests/test_parsers_mobileactivation.py +++ b/tests/test_parsers_mobileactivation.py @@ -11,6 +11,7 @@ def test_mobileactivation(self): for file in files: print(f'Parsing {file}') result = parsemobactiv([file]) + pass for item in result['events']: self.assertTrue('timestamp' in item) self.assertTrue('loglevel' in item) @@ -18,8 +19,6 @@ def test_mobileactivation(self): if item['loglevel'] == 'debug' and 'build_version' in item: self.assertTrue('build_version' in item) self.assertTrue('internal_build' in item) - self.assertTrue('product_type' in item) - self.assertTrue('device_class' in item) else: self.assertTrue('msg' in item) # self.assertTrue('event_type' in item) # not all logs have event_type diff --git a/tests/test_parsers_mobileinstallation.py b/tests/test_parsers_mobileinstallation.py index b5a272b..987126e 100644 --- a/tests/test_parsers_mobileinstallation.py +++ b/tests/test_parsers_mobileinstallation.py @@ -11,6 +11,7 @@ def test_mobileinstallation(self): for file in files: print(f'Parsing {file}') result = parsemobinstall([file]) + pass for item in result['events']: self.assertTrue('timestamp' in item) self.assertTrue('loglevel' in item) diff --git a/utils/multilinelog.py b/utils/multilinelog.py new file mode 100644 index 0000000..46e5945 --- /dev/null +++ b/utils/multilinelog.py @@ -0,0 +1,138 @@ +import re +import io +import misc + + +def extract_from_file(fname): + with open(fname, 'r', encoding="utf-8") as f: + return extract_from_iowrapper(f) + + +def extract_from_string(logstring): + return extract_from_iowrapper(io.StringIO(logstring)) + + +def extract_from_iowrapper(f: io.TextIOWrapper): + # multiline parsing with the following logic: + # - build an entry with the seen lines + # - upon discovery of a new entry, or the end of the file, consider the entry as complete and process the lines + # - discovery of a new entry is done based on the timestamp, as each new entry starts this way + events = {"events": []} + prev_lines = [] + kv_section = False # key-value section separated by a semicolon + for line in f: + timeregex = re.search(r"(?<=^)(.*?)(?= \[[0-9]+)", line) # Regex for timestamp + if '_____' in line and re.search(r": _{10,25} [^_]+ _{10,25}", line): + kv_section = 'start' + if timeregex and (not kv_section or kv_section == 'start' or kv_section == 'end'): + # new entry, process the previous entry + if kv_section == 'start': + kv_section = True + if kv_section == 'end': + kv_section = False + events['events'].append(build_from_kv_section(prev_lines)) + prev_lines = [] + continue # go to next line as current line is just the closure of the section + elif prev_lines: + new_entry = build_from_logentry(''.join(prev_lines)) + events['events'].append(new_entry) + # build the new entry + prev_lines = [] + prev_lines.append(line) + elif prev_lines or kv_section: + # not a new entry, add the line to the previous entry + prev_lines.append(line) + else: + pass + if kv_section and '_____' in line and re.search(r": _{40,80}$", line): # only end if kv_section was started + kv_section = 'end' + # process the last entry + if kv_section and len(prev_lines) > 1: + new_entry = build_from_kv_section(prev_lines) + else: + new_entry = build_from_logentry(''.join(prev_lines)) + if new_entry: + events['events'].append(new_entry) + return events + + +def build_from_kv_section(lines): + new_entry = build_from_logentry(lines.pop(0)) # first line is a normal line + if '_____' in lines[-1]: + lines.pop() # drop last line as it's just the closing line + # complement with key-value section + for line in lines: + splitted = line.split(":") + if len(splitted) > 1: + new_entry[splitted[-2].strip()] = splitted[-1].strip() + return new_entry + + +def build_from_logentry(line): + entry = {} + # timestamp + timeregex = re.search(r"(?<=^)(.*?)(?= \[[0-9]+)", line) # Regex for timestamp + if timeregex: + timestamp = timeregex.group(1) + weekday, month, day, time, year = (str.split(timestamp[:24])) + day = day_converter(day) + month = month_converter(month) + entry['timestamp'] = str(year) + '-' + str(month) + '-' + str(day) + ' ' + str(time) + + # log level + loglevelregex = re.search(r"\<(.*?)\>", line) + entry['loglevel'] = loglevelregex.group(1) + + # hex_ID + hexIDregex = re.search(r"\(0x(.*?)\)", line) + entry['hexID'] = '0x' + hexIDregex.group(1) + + # event_type + eventyperegex = re.search(r"\-\[(.*)(\]\:)", line) + if eventyperegex: + entry['event_type'] = eventyperegex.group(1) + + # msg + if 'event_type' in entry: + msgregex = re.search(r"\]\:(.*)", line, re.MULTILINE | re.DOTALL) + else: + msgregex = re.search(r"\)\ (.*)", line, re.MULTILINE | re.DOTALL) + line = msgregex.group(1).strip() + # plist parsing + if line.endswith(''): + plist_start = line.index('