diff --git a/README.md b/README.md index 2c3fea2..d450dfe 100644 --- a/README.md +++ b/README.md @@ -162,11 +162,11 @@ sudo apt install cargo Now you can download and compile the code: ```bash git clone https://github.com/mandiant/macos-UnifiedLogs -cd macos-UnifiedLogs/examples/unifiedlog_parser_json/ +cd macos-UnifiedLogs/examples/unifiedlog_iterator/ cargo build --release -sudo cp ../target/release/unifiedlog_parser_json /usr/local/bin/ +sudo cp ../target/release/unifiedlog_iterator /usr/local/bin/ ``` -See `unifiedlog_parser_json --help` for more instructions to use the tool, or use it directly through sysdiagnose. +See `unifiedlog_iterator --help` for more instructions to use the tool, or use it directly through sysdiagnose. # Supported iOS versions diff --git a/src/sysdiagnose/parsers/logarchive.py b/src/sysdiagnose/parsers/logarchive.py index 24812e5..a04d731 100644 --- a/src/sysdiagnose/parsers/logarchive.py +++ b/src/sysdiagnose/parsers/logarchive.py @@ -35,8 +35,7 @@ # https://github.com/mandiant/macos-UnifiedLogs # Follow instruction in the README.md in order to install it. # TODO unifiedlog_parser is single threaded, either patch their code for multithreading support or do the magic here by parsing each file in a separate thread -# cmd_parsing_linux = 'unifiedlog_parser_json --input %s --output %s' -cmd_parsing_linux_test = ['unifiedlog_parser_json', '--help'] +cmd_parsing_linux_test = ['unifiedlog_iterator', '--help'] # --------------------------------------------------------------------------- # # LATER consider refactoring using yield to lower memory consumption @@ -214,7 +213,7 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) f_out.write(json.dumps(entry_json) + '\n') except json.JSONDecodeError as e: - logger.warning(f"WARNING: error parsing JSON {line}", exc_info=True) + logger.warning(f"WARNING: error parsing JSON {line} - {e}", exc_info=True) except KeyError: # last line of log does not contain 'time' field, nor the rest of the data. # so just ignore it and all the rest. @@ -223,44 +222,35 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis break def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> list: - logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') - # run the conversion tool, saving to a temp folder - # read the created file/files, add timestamp - # sort based on time - # save to one single file in output folder + with open(output_file, 'w') as f: + for entry in LogarchiveParser.__convert_using_unifiedlogparser_generator(input_folder): + json.dump(entry, f) + f.write('\n') - # first check if binary exists in PATH, if not, return an error - try: - subprocess.check_output(cmd_parsing_linux_test, universal_newlines=True) - except FileNotFoundError: - logger.exception('ERROR: UnifiedLogReader not found, please install it. See README.md for more information.') - return + @DeprecationWarning + def __convert_using_unifiedlogparser_save_file(input_folder: str, output_file: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--input', input_folder, '--output', output_file, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + result = LogarchiveParser.__execute_cmd_and_get_result(cmd_array) + return result - # really run the tool now - entries = [] - with tempfile.TemporaryDirectory() as tmp_outpath: - cmd_array = ['unifiedlog_parser_json', '--input', input_folder, '--output', tmp_outpath] - # run the command and get the result in our tmp_outpath folder - LogarchiveParser.__execute_cmd_and_get_result(cmd_array) - # read each file, conver line by line and write the output directly to the new file - # LATER run this in multiprocessing, one per file to speed up the process - for fname_reading in os.listdir(tmp_outpath): - with open(os.path.join(tmp_outpath, fname_reading), 'r') as f: - for line in f: # jsonl format - one json object per line - try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) - entries.append(entry_json) - except json.JSONDecodeError as e: - logger.warning(f"WARNING: error parsing JSON {fname_reading}", exc_info=True) - # tempfolder is cleaned automatically after the block - - # sort the data as it's not sorted by default, and we need sorted data for other analysers - entries.sort(key=lambda x: x['time']) - # save to file as JSONL - with open(output_file, 'w') as f_out: - for entry in entries: - f_out.write(json.dumps(entry)) - f_out.write('\n') + def __convert_using_unifiedlogparser_generator(input_folder: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--input', input_folder, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): + try: + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) + yield entry_json + except json.JSONDecodeError: + pass + except KeyError: + pass def __execute_cmd_and_yield_result(cmd_array: list) -> Generator[dict, None, None]: ''' diff --git a/src/sysdiagnose/utils/misc.py b/src/sysdiagnose/utils/misc.py index efccdc4..781323a 100644 --- a/src/sysdiagnose/utils/misc.py +++ b/src/sysdiagnose/utils/misc.py @@ -8,6 +8,7 @@ import json import nska_deserialize import os +import heapq def merge_dicts(a: dict, b: dict) -> dict: @@ -139,3 +140,45 @@ def find_bytes(d): # encoding is not always utf-8 d[k] = binascii.hexlify(v).decode('utf-8') return d + + +def sort_large_file(input_file, output_file, chunk_size=100000): + temp_files = [] + + try: + # Step 1: Split into sorted chunks + with open(input_file, "r") as infile: + chunk = [] + for line in infile: + record = json.loads(line.strip()) + chunk.append(record) + + # When chunk size is reached, sort and write to a temporary file + if len(chunk) >= chunk_size: + temp_file = f"temp_chunk_{len(temp_files)}.jsonl" + with open(temp_file, "w") as tmp: + for record in sorted(chunk, key=lambda x: x["timestamp"]): + tmp.write(json.dumps(record) + "\n") + temp_files.append(temp_file) + chunk = [] + + # Sort and write any remaining records + if chunk: + temp_file = f"temp_chunk_{len(temp_files)}.jsonl" + with open(temp_file, "w") as tmp: + for record in sorted(chunk, key=lambda x: x["timestamp"]): + tmp.write(json.dumps(record) + "\n") + temp_files.append(temp_file) + + # Step 2: Merge sorted chunks + with open(output_file, "w") as outfile: + open_files = [open(temp_file, "r") for temp_file in temp_files] + iterators = (map(json.loads, f) for f in open_files) + for record in heapq.merge(*iterators, key=lambda x: x["timestamp"]): + outfile.write(json.dumps(record) + "\n") + finally: + # Close all temporary files + for f in open_files: + f.close() + for f in temp_files: + os.remove(f) \ No newline at end of file