diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index 3012525..c98f97d 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -23,13 +23,13 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up unifiedlog_parser_json + - name: Set up unifiedlog_iterator if: matrix.os == 'ubuntu-latest' run: | git clone https://github.com/mandiant/macos-UnifiedLogs - cd macos-UnifiedLogs/examples/unifiedlog_parser_json/ + cd macos-UnifiedLogs/examples/unifiedlog_iterator/ cargo build --release - sudo cp ../target/release/unifiedlog_parser_json /usr/local/bin/ + sudo cp ../target/release/unifiedlog_iterator /usr/local/bin/ - name: Set up Python uses: actions/setup-python@v4 with: diff --git a/README.md b/README.md index 2c3fea2..d450dfe 100644 --- a/README.md +++ b/README.md @@ -162,11 +162,11 @@ sudo apt install cargo Now you can download and compile the code: ```bash git clone https://github.com/mandiant/macos-UnifiedLogs -cd macos-UnifiedLogs/examples/unifiedlog_parser_json/ +cd macos-UnifiedLogs/examples/unifiedlog_iterator/ cargo build --release -sudo cp ../target/release/unifiedlog_parser_json /usr/local/bin/ +sudo cp ../target/release/unifiedlog_iterator /usr/local/bin/ ``` -See `unifiedlog_parser_json --help` for more instructions to use the tool, or use it directly through sysdiagnose. +See `unifiedlog_iterator --help` for more instructions to use the tool, or use it directly through sysdiagnose. # Supported iOS versions diff --git a/src/sysdiagnose/parsers/accessibility_tcc.py b/src/sysdiagnose/parsers/accessibility_tcc.py index 832cd87..614b130 100644 --- a/src/sysdiagnose/parsers/accessibility_tcc.py +++ b/src/sysdiagnose/parsers/accessibility_tcc.py @@ -6,7 +6,7 @@ import glob import os -from sysdiagnose.utils.base import BaseParserInterface +from sysdiagnose.utils.base import BaseParserInterface, logger from sysdiagnose.utils.apollo import Apollo @@ -31,7 +31,7 @@ def execute(self) -> list | dict: # only one file to parse try: result = [] - apollo = Apollo(os_version='yolo') # FIXME get right OS version, but also update the Apollo modules to be aware of the latest OS versions + apollo = Apollo(logger=logger, os_version='yolo') # FIXME get right OS version, but also update the Apollo modules to be aware of the latest OS versions for logfile in self.get_log_files(): result.extend(apollo.parse_db(db_fname=logfile, db_type='TCC.db')) return result diff --git a/src/sysdiagnose/parsers/logarchive.py b/src/sysdiagnose/parsers/logarchive.py index 24812e5..a04d731 100644 --- a/src/sysdiagnose/parsers/logarchive.py +++ b/src/sysdiagnose/parsers/logarchive.py @@ -35,8 +35,7 @@ # https://github.com/mandiant/macos-UnifiedLogs # Follow instruction in the README.md in order to install it. # TODO unifiedlog_parser is single threaded, either patch their code for multithreading support or do the magic here by parsing each file in a separate thread -# cmd_parsing_linux = 'unifiedlog_parser_json --input %s --output %s' -cmd_parsing_linux_test = ['unifiedlog_parser_json', '--help'] +cmd_parsing_linux_test = ['unifiedlog_iterator', '--help'] # --------------------------------------------------------------------------- # # LATER consider refactoring using yield to lower memory consumption @@ -214,7 +213,7 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) f_out.write(json.dumps(entry_json) + '\n') except json.JSONDecodeError as e: - logger.warning(f"WARNING: error parsing JSON {line}", exc_info=True) + logger.warning(f"WARNING: error parsing JSON {line} - {e}", exc_info=True) except KeyError: # last line of log does not contain 'time' field, nor the rest of the data. # so just ignore it and all the rest. @@ -223,44 +222,35 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis break def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> list: - logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') - # run the conversion tool, saving to a temp folder - # read the created file/files, add timestamp - # sort based on time - # save to one single file in output folder + with open(output_file, 'w') as f: + for entry in LogarchiveParser.__convert_using_unifiedlogparser_generator(input_folder): + json.dump(entry, f) + f.write('\n') - # first check if binary exists in PATH, if not, return an error - try: - subprocess.check_output(cmd_parsing_linux_test, universal_newlines=True) - except FileNotFoundError: - logger.exception('ERROR: UnifiedLogReader not found, please install it. See README.md for more information.') - return + @DeprecationWarning + def __convert_using_unifiedlogparser_save_file(input_folder: str, output_file: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--input', input_folder, '--output', output_file, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + result = LogarchiveParser.__execute_cmd_and_get_result(cmd_array) + return result - # really run the tool now - entries = [] - with tempfile.TemporaryDirectory() as tmp_outpath: - cmd_array = ['unifiedlog_parser_json', '--input', input_folder, '--output', tmp_outpath] - # run the command and get the result in our tmp_outpath folder - LogarchiveParser.__execute_cmd_and_get_result(cmd_array) - # read each file, conver line by line and write the output directly to the new file - # LATER run this in multiprocessing, one per file to speed up the process - for fname_reading in os.listdir(tmp_outpath): - with open(os.path.join(tmp_outpath, fname_reading), 'r') as f: - for line in f: # jsonl format - one json object per line - try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) - entries.append(entry_json) - except json.JSONDecodeError as e: - logger.warning(f"WARNING: error parsing JSON {fname_reading}", exc_info=True) - # tempfolder is cleaned automatically after the block - - # sort the data as it's not sorted by default, and we need sorted data for other analysers - entries.sort(key=lambda x: x['time']) - # save to file as JSONL - with open(output_file, 'w') as f_out: - for entry in entries: - f_out.write(json.dumps(entry)) - f_out.write('\n') + def __convert_using_unifiedlogparser_generator(input_folder: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--input', input_folder, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): + try: + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) + yield entry_json + except json.JSONDecodeError: + pass + except KeyError: + pass def __execute_cmd_and_yield_result(cmd_array: list) -> Generator[dict, None, None]: ''' diff --git a/src/sysdiagnose/parsers/powerlogs.py b/src/sysdiagnose/parsers/powerlogs.py index 6d97a09..1c86a29 100644 --- a/src/sysdiagnose/parsers/powerlogs.py +++ b/src/sysdiagnose/parsers/powerlogs.py @@ -6,7 +6,7 @@ import glob import os -from sysdiagnose.utils.base import BaseParserInterface +from sysdiagnose.utils.base import BaseParserInterface, logger from sysdiagnose.utils.apollo import Apollo @@ -34,7 +34,7 @@ def get_log_files(self) -> list: def execute(self) -> list: result = [] - apollo = Apollo(os_version='yolo') # FIXME get right OS version, but also update the Apollo modules to be aware of the latest OS versions + apollo = Apollo(logger=logger, os_version='yolo') # FIXME get right OS version, but also update the Apollo modules to be aware of the latest OS versions for logfile in self.get_log_files(): result.extend(apollo.parse_db(db_fname=logfile, db_type='CurrentPowerlog.PLSQL')) diff --git a/src/sysdiagnose/parsers/spindumpnosymbols.py b/src/sysdiagnose/parsers/spindumpnosymbols.py index 2333a66..ce277f3 100644 --- a/src/sysdiagnose/parsers/spindumpnosymbols.py +++ b/src/sysdiagnose/parsers/spindumpnosymbols.py @@ -45,7 +45,11 @@ def parse_file(path: str) -> list: # stripping for line in f_in: - if line.strip() == "" or line.strip() == "Heavy format: stacks are sorted by count" or line.strip() == "Use -i and -timeline to re-report with chronological sorting": + if line.strip() == "No samples": + status = 'empty' + # Since the rest is just 'binary format', we ignore the rest of the file. + break + elif line.strip() == "" or line.strip() == "Heavy format: stacks are sorted by count" or line.strip() == "Use -i and -timeline to re-report with chronological sorting": continue elif line.strip() == "------------------------------------------------------------": status = 'processes_raw' @@ -62,9 +66,12 @@ def parse_file(path: str) -> list: # call parsing function per section events = [] - basic = SpindumpNoSymbolsParser.parse_basic(headers) - events.append(basic) - events.extend(SpindumpNoSymbolsParser.parse_processes(processes_raw, start_timestamp=basic['timestamp'])) + if status != 'empty': + basic = SpindumpNoSymbolsParser.parse_basic(headers) + events.append(basic) + events.extend(SpindumpNoSymbolsParser.parse_processes(processes_raw, start_timestamp=basic['timestamp'])) + # Logging + logger.debug(f"{len(events)} events retrieved", extra={'num_events': len(events)}) return events diff --git a/src/sysdiagnose/utils/apollo.py b/src/sysdiagnose/utils/apollo.py index fd0754c..06206c7 100644 --- a/src/sysdiagnose/utils/apollo.py +++ b/src/sysdiagnose/utils/apollo.py @@ -64,12 +64,13 @@ import re from datetime import datetime, timezone import glob +import logging default_mod_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'apollo_modules') class Apollo(): - def __init__(self, mod_dir: str = default_mod_dir, os_version: str = 'yolo'): + def __init__(self, logger: logging.Logger, mod_dir: str = default_mod_dir, os_version: str = 'yolo'): """ Initialize the Apollo class for parsing databases @@ -77,6 +78,7 @@ def __init__(self, mod_dir: str = default_mod_dir, os_version: str = 'yolo'): mod_dir (str): The directory where the module definitions are stored os_version (str): The version of the OS for which to parse the modules. 'yolo' means all versions. """ + self.logger = logger self.os_version = os_version self.mod_dir = mod_dir @@ -125,7 +127,7 @@ def parse_db(self, db_fname: str, db_type: str = None) -> list: try: module_queries = self.modules[db_type] except KeyError: - print(f"No modules with queries for {db_type}.") + self.logger.exception(f"No modules with queries for {db_type}.") return results # establish db connection @@ -140,11 +142,14 @@ def parse_db(self, db_fname: str, db_type: str = None) -> list: cur.execute(module_query['sql']) rows = cur.fetchall() except Exception: - print(f"WARNING: Cannot fetch query contents for query with name: {module_query['name']}.") + self.logger.warning( + f"WARNING: Cannot fetch query contents for query with name: {module_query['name']}.", + extra={"apollo_module": module_query['name']}, exc_info=True) continue if not rows: - print(f"No Records Found for {module_query['name']}.") + self.logger.info(f"No Records Found for {module_query['name']}.", + extra={"apollo_module": module_query['name']}) continue headers = [] @@ -164,7 +169,9 @@ def parse_db(self, db_fname: str, db_type: str = None) -> list: results.append(item) except TypeError: # problem with timestamp parsing - print(f"WARNING: Problem with timestamp parsing for table {db_fname}, row {list(row)}") + self.logger.warning(f"WARNING: Problem with timestamp parsing for table {db_fname}, row {list(row)}", + extra={"apollo_module": module_query['name'], "table": db_fname, "row": list(row)}, + exc_info=True) - print("Executing module on: " + db_fname) + self.logger.info("Executing module on: " + db_fname, extra={"apollo_module": module_query['name'], "table": db_fname}) return results diff --git a/src/sysdiagnose/utils/logger.py b/src/sysdiagnose/utils/logger.py index 7ad8b84..90d94db 100644 --- a/src/sysdiagnose/utils/logger.py +++ b/src/sysdiagnose/utils/logger.py @@ -3,8 +3,8 @@ from datetime import datetime logger = logging.getLogger('sysdiagnose') -# By default, we want to have the possibility to log almost everything. -logger.setLevel(logging.INFO) +# By default, we want to have the possibility to log everything. +logger.setLevel(logging.DEBUG) class SysdiagnoseJsonFormatter(jsonlogger.JsonFormatter): @@ -31,13 +31,13 @@ def get_console_handler(level: str) -> logging.StreamHandler: return ch -def get_json_handler(filename: str, level: int = logging.INFO) -> logging.FileHandler: +def get_json_handler(filename: str, level: int = logging.DEBUG) -> logging.FileHandler: ''' Creates a logging JSON format file handler. Args: filename: Filename where to log. - level: Logging level. By default to INFO. https://docs.python.org/3/library/logging.html#logging-levels + level: Logging level. By default to DEBUG. https://docs.python.org/3/library/logging.html#logging-levels ''' fmt_json = SysdiagnoseJsonFormatter( fmt='%(created)f %(asctime)s %(levelname)s %(module)s %(message)s', diff --git a/src/sysdiagnose/utils/misc.py b/src/sysdiagnose/utils/misc.py index efccdc4..9a726b7 100644 --- a/src/sysdiagnose/utils/misc.py +++ b/src/sysdiagnose/utils/misc.py @@ -8,6 +8,7 @@ import json import nska_deserialize import os +import heapq def merge_dicts(a: dict, b: dict) -> dict: @@ -139,3 +140,45 @@ def find_bytes(d): # encoding is not always utf-8 d[k] = binascii.hexlify(v).decode('utf-8') return d + + +def sort_large_file(input_file, output_file, chunk_size=100000): + temp_files = [] + + try: + # Step 1: Split into sorted chunks + with open(input_file, "r") as infile: + chunk = [] + for line in infile: + record = json.loads(line.strip()) + chunk.append(record) + + # When chunk size is reached, sort and write to a temporary file + if len(chunk) >= chunk_size: + temp_file = f"temp_chunk_{len(temp_files)}.jsonl" + with open(temp_file, "w") as tmp: + for record in sorted(chunk, key=lambda x: x["timestamp"]): + tmp.write(json.dumps(record) + "\n") + temp_files.append(temp_file) + chunk = [] + + # Sort and write any remaining records + if chunk: + temp_file = f"temp_chunk_{len(temp_files)}.jsonl" + with open(temp_file, "w") as tmp: + for record in sorted(chunk, key=lambda x: x["timestamp"]): + tmp.write(json.dumps(record) + "\n") + temp_files.append(temp_file) + + # Step 2: Merge sorted chunks + with open(output_file, "w") as outfile: + open_files = [open(temp_file, "r") for temp_file in temp_files] + iterators = (map(json.loads, f) for f in open_files) + for record in heapq.merge(*iterators, key=lambda x: x["timestamp"]): + outfile.write(json.dumps(record) + "\n") + finally: + # Close all temporary files + for f in open_files: + f.close() + for f in temp_files: + os.remove(f)