diff --git a/.vscode/launch.json b/.vscode/launch.json index 7aa5dc5..870bd1f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -29,6 +29,14 @@ "args": "list analysers", "cwd": "${workspaceFolder}/" }, + { + "name": "Python Debugger: analyse.py analyse timeliner", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/analyse.py", + "args": "analyse timeliner 1", + "cwd": "${workspaceFolder}/" + }, { "name": "Python Debugger: analyse.py analyse apps 1", "type": "debugpy", diff --git a/analysers/apps.py b/analysers/apps.py index eca73c6..fb38daa 100644 --- a/analysers/apps.py +++ b/analysers/apps.py @@ -5,7 +5,6 @@ import os import json -import ijson import re analyser_description = 'Get list of Apps installed on the device' diff --git a/analysers/timeliner.py b/analysers/timeliner.py index 8e569a3..bcdb7ef 100644 --- a/analysers/timeliner.py +++ b/analysers/timeliner.py @@ -4,95 +4,76 @@ # Script to extract timestamp and generate a timesketch output # Author: david@autopsit.org # -# Important note: timestamp are in microseconds! standard epoch is in seconds. +# Important note: timestamp are in microseconds! standard epoch is in seconds. # FIXME is this correct? import os -import sys import json -from datetime import datetime -from optparse import OptionParser +from datetime import datetime, timezone +from parsers.logarchive import convert_entry_to_unifiedlog_format, convert_unifiedlog_time_to_datetime -version_string = "sysdiagnose-timeliner.py v2023-04-05 Version 0.1" - -# ----- definition for analyse.py script -----# -# ----- DO NOT DELETE ----# analyser_description = "Generate a Timesketch compatible timeline" -analyser_call = "generate_timeline" analyser_format = "jsonl" -# Structure: -# filename : parsing_function -timestamps_files = { - "sysdiagnose-accessibility-tcc.json": "__extract_ts_accessibility_tcc", - # itunesstore: TODO - "sysdiagnose-mobileactivation.json": "__extract_ts_mobileactivation", - "sysdiagnose-powerlogs.json": "__extract_ts_powerlogs", - "sysdiagnose-swcutil.json": "__extract_ts_swcutil", - "sysdiagnose-shutdownlogs.json": "__extract_ts_shutdownlogs", - "sysdiagnose-logarchive.json": "__extract_ts_logarchive", - "sysdiagnose-wifisecurity.json": "__extract_ts_wifisecurity", - "sysdiagnose_wifi_known_networks.json": "__extract_ts_wifi_known_networks", -} - # Timesketch format: # {"message": "A message","timestamp": 123456789,"datetime": "2015-07-24T19:01:01+00:00","timestamp_desc": "Write time","extra_field_1": "foo"} timeline = [] -def __extract_ts_mobileactivation(filename): +def __extract_ts_mobileactivation(case_folder: str) -> bool: try: - with open(filename, 'r') as fd: + filename = 'mobileactivation.json' + with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) - if "events" in data.keys(): - for event in data["events"]: - timestamp = datetime.strptime(event["timestamp"], "%Y-%m-%d %H:%M:%S") - ts_event = { - "message": "Mobile Activation", - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "Mobile Activation Time", - "extra_field_1": "Build Version: %s" % event["build_version"] - } - timeline.append(ts_event) - else: - return False + for event in data: + ts_event = { + "message": "Mobile Activation", + "timestamp": event['timestamp'], + "datetime": event['datetime'], + "timestamp_desc": "Mobile Activation Time" + } + try: + ts_event["extra_field_1"] = "Build Version: %s" % event["build_version"] + except KeyError: + # skip other type of event + # FIXME what should we do? the log file (now) contains nice timestamps, do we want to extract less, but summarized, data? + continue + timeline.append(ts_event) return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") return False - return False -def __extract_ts_powerlogs(filename): +def __extract_ts_powerlogs(case_folder: str) -> bool: try: - with open(filename, 'r') as fd: + filename = 'powerlogs.json' + with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) # extract tables of interest - __extract_ts_powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(data) # PLProcessMonitorAgent_EventPoint_ProcessExit - __extract_ts_powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(data) # PLProcessMonitorAgent_EventBackward_ProcessExitHistogram - __extract_ts_powerlogs__PLAccountingOperator_EventNone_Nodes(data) # PLAccountingOperator_EventNone_Nodes + __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(data) # PLProcessMonitorAgent_EventPoint_ProcessExit + __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(data) # PLProcessMonitorAgent_EventBackward_ProcessExitHistogram + __powerlogs__PLAccountingOperator_EventNone_Nodes(data) # PLAccountingOperator_EventNone_Nodes return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") return False - return False -def __extract_ts_powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(jdata): +def __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(jdata): proc_exit = jdata["PLProcessMonitorAgent_EventPoint_ProcessExit"] for proc in proc_exit: - timestamp = datetime.fromtimestamp(int(proc["timestamp"])) + timestamp = datetime.fromtimestamp(proc["timestamp"], tz=timezone.utc) extra_field = "" if "IsPermanent" in proc.keys(): extra_field = "Is permanent: %d" % proc["IsPermanent"] ts_event = { "message": proc["ProcessName"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "timestamp": proc["timestamp"], + "datetime": timestamp.isoformat(), "timestamp_desc": "Process Exit with reason code: %d reason namespace %d" % (proc["ReasonCode"], proc["ReasonNamespace"]), "extra_field_1": extra_field } @@ -100,14 +81,14 @@ def __extract_ts_powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(jdata): return -def __extract_ts_powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(jdata): +def __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(jdata): events = jdata["PLProcessMonitorAgent_EventBackward_ProcessExitHistogram"] for event in events: - timestamp = datetime.fromtimestamp(int(event["timestamp"])) + timestamp = datetime.fromtimestamp(event["timestamp"], tz=timezone.utc) ts_event = { "message": event["ProcessName"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "timestamp": event["timestamp"], + "datetime": timestamp.isoformat(), "timestamp_desc": "Process Exit with reason code: %d reason namespace %d" % (event["ReasonCode"], event["ReasonNamespace"]), "extra_field_1": "Crash frequency: [0-5s]: %d, [5-10s]: %d, [10-60s]: %d, [60s+]: %d" % (event["0s-5s"], event["5s-10s"], event["10s-60s"], event["60s+"]) } @@ -115,21 +96,22 @@ def __extract_ts_powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHisto return -def __extract_ts_powerlogs__PLAccountingOperator_EventNone_Nodes(jdata): +def __powerlogs__PLAccountingOperator_EventNone_Nodes(jdata): eventnone = jdata["PLAccountingOperator_EventNone_Nodes"] for event in eventnone: - timestamp = datetime.fromtimestamp(int(event["timestamp"])) + timestamp = datetime.fromtimestamp(event["timestamp"], tz=timezone.utc) ts_event = { "message": event["Name"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "timestamp": event["timestamp"], + "datetime": timestamp.isoformat(), "timestamp_desc": "PLAccountingOperator Event", "extra_field_1": "Is permanent: %d" % event["IsPermanent"] } timeline.append(ts_event) return -def __extract_ts_swcutil(filename): + +def __extract_ts_swcutil(case_folder: str) -> bool: """ FORMAT: "Service": "applinks", @@ -144,26 +126,33 @@ def __extract_ts_swcutil(filename): "Next Check": "2023-02-28 22:06:35 +0000" }, """ - with open(filename, 'r') as fd: - data = json.load(fd) - if "db" in data.keys(): - for service in data["db"]: - try: - timestamp = datetime.strptime(service["Last Checked"], "%Y-%m-%d %H:%M:%S %z") - ts_event = { - "message": service["Service"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "swcutil last checkeed", - "extra_field_1": "application: %s" % service["App ID"] - } - timeline.append(ts_event) - except Exception as e: - print(f"ERROR {filename} while extracting timestamp from {(service['Service'])} - {(service['App ID'])}. Record not inserted.") - return True + filename = 'swcutil.json' + try: + with open(os.path.join(case_folder, filename), 'r') as fd: + data = json.load(fd) + if "db" in data.keys(): + for service in data["db"]: + try: + timestamp = datetime.strptime(service["Last Checked"], "%Y-%m-%d %H:%M:%S %z") + ts_event = { + "message": service["Service"], + "timestamp": float(timestamp.timestamp()), + "datetime": timestamp.isoformat(), + "timestamp_desc": "swcutil last checkeed", + "extra_field_1": "application: %s" % service["App ID"] + } + timeline.append(ts_event) + except KeyError: + # some entries do not have a Last Checked or timestamp field + # print(f"WARNING {filename} while extracting timestamp from {(service['Service'])} - {(service['App ID'])}. Record not inserted.") + pass + return True + except Exception as e: + print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") + return False -def __extract_ts_accessibility_tcc(filename): +def __extract_ts_accessibility_tcc(case_folder: str) -> bool: """ Service format { "service": "kTCCServiceCamera" }, @@ -180,17 +169,18 @@ def __extract_ts_accessibility_tcc(filename): { "flags": "None" }, { "last_modified": "1537694318" } """ + filename = 'accessibility_tcc.json' try: - with open(filename, 'r') as fd: + with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) if "access" in data.keys(): for access in data["access"]: # create timeline entry - timestamp = datetime.fromtimestamp(int(access["last_modified"])) + timestamp = datetime.fromtimestamp(access["last_modified"], tz=timezone.utc) ts_event = { "message": access["service"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "timestamp": float(timestamp.timestamp()), + "datetime": timestamp.isoformat(), "timestamp_desc": "Accessibility TC Last Modified", "extra_field_1": "client: %s" % access["client"] } @@ -199,36 +189,35 @@ def __extract_ts_accessibility_tcc(filename): except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") return False - return False -def __extract_ts_shutdownlogs(filename): + +def __extract_ts_shutdownlogs(case_folder: str) -> bool: + filename = 'shutdownlogs.json' try: - with open(filename, 'r') as fd: + with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) - for ts in data["data"].keys(): + for ts, processes in data.items(): try: # create timeline entries timestamp = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S+00:00") - processes = data["data"][ts] for p in processes: ts_event = { "message": p["path"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + "timestamp": float(timestamp.timestamp()), + "datetime": timestamp.isoformat(), "timestamp_desc": "Entry in shutdown.log", "extra_field_1": "pid: %s" % p["pid"] } timeline.append(ts_event) except Exception as e: - print(f"WARNING: entry not parsed: {ts}") + print(f"WARNING: entry not parsed: {ts}. Reason: {str(e)}") return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") return False - return False -def __extract_ts_logarchive(filename): +def __extract_ts_logarchive(case_folder: str) -> bool: r""" Entry format: { @@ -263,31 +252,34 @@ def __extract_ts_logarchive(filename): "timezoneName" : "" }, """ # XXX FIXME pycodestyle error W605 when not using python's r-strings. Are the backslashes actually there in the data? - try: - with open(filename, 'r') as fd: - data = json.load(fd) - for trace in data["data"]: - try: - # create timeline entry - timestamp = datetime.strptime(trace["timestamp"], "%Y-%m-%d %H:%M:%S.%f%z") - ts_event = { - "message": trace["eventMessage"], - "timestamp": int(timestamp.timestamp() * 1000000), - "datetime": timestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "Entry in logarchive: %s" % trace["eventType"], - "extra_field_1": "subsystem: %s; processImageUUID: %s; processImagePath: %s" % (trace["subsystem"], trace["processImageUUID"], trace["processImagePath"]) - } - timeline.append(ts_event) - except Exception as e: - print(f"WARNING: trace not parsed: {trace}") - return True - except Exception as e: - print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") - return False - return False + logarchive_dir = os.path.join(case_folder, 'logarchive') + no_error = True + for file_in_logarchive_dir in os.listdir(logarchive_dir): + try: + with open(os.path.join(logarchive_dir, file_in_logarchive_dir), 'r') as fd: + for line in fd: + # standardise the logarchive entryto unifiedlog format + try: + trace = convert_entry_to_unifiedlog_format(json.loads(line)) + # create timeline entry + timestamp = convert_unifiedlog_time_to_datetime(trace["time"]) + ts_event = { + "message": trace["message"], + "timestamp": timestamp.timestamp(), + "datetime": timestamp.isoformat(), + "timestamp_desc": "Entry in logarchive: %s" % trace["event_type"], + "extra_field_1": f"subsystem: {trace["subsystem"]}; process_uuid: {trace["process_uuid"]}; process: {trace["process"]}; library: {trace["library"]}; library_uuid: {trace["library_uuid"]}" + } + timeline.append(ts_event) + except KeyError as e: + print(f"WARNING: trace not parsed: {trace}. Error {e}") + except Exception as e: + print(f"ERROR while extracting timestamp from {file_in_logarchive_dir}. Reason: {str(e)}") + no_error = False + return no_error -def __extract_ts_wifisecurity(filename): +def __extract_ts_wifisecurity(case_folder: str) -> bool: """ "accc": "", "acct": "SSID NAME", @@ -301,177 +293,115 @@ def __extract_ts_wifisecurity(filename): "sync": "1", "tomb": "0" """ + filename = 'wifisecurity.json' try: - with open(filename, 'r') as fd: + with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) for wifi in data: - if bool(wifi): - # create timeline entry - ctimestamp = datetime.strptime(wifi["cdat"], "%Y-%m-%d %H:%M:%S %z") - mtimestamp = datetime.strptime(wifi["mdat"], "%Y-%m-%d %H:%M:%S %z") + # create timeline entry + ctimestamp = datetime.strptime(wifi["cdat"], "%Y-%m-%d %H:%M:%S %z") + mtimestamp = datetime.strptime(wifi["mdat"], "%Y-%m-%d %H:%M:%S %z") - # Event 1: creation - ts_event = { - "message": wifi["acct"], - "timestamp": int(ctimestamp.timestamp() * 1000000), - "datetime": ctimestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "SSID added to known secured WIFI list", - "extra_field_1": wifi["accc"] - } - timeline.append(ts_event) + # Event 1: creation + ts_event = { + "message": wifi["acct"], + "timestamp": float(ctimestamp.timestamp()), + "datetime": ctimestamp.isoformat(), + "timestamp_desc": "SSID added to known secured WIFI list", + "extra_field_1": wifi["accc"] + } + timeline.append(ts_event) - # Event 2: modification - ts_event = { - "message": wifi["acct"], - "timestamp": int(mtimestamp.timestamp() * 1000000), - "datetime": mtimestamp.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "SSID modified into the secured WIFI list", - "extra_field_1": wifi["accc"] - } - timeline.append(ts_event) + # Event 2: modification + ts_event = { + "message": wifi["acct"], + "timestamp": float(mtimestamp.timestamp()), + "datetime": mtimestamp.isoformat(), + "timestamp_desc": "SSID modified into the secured WIFI list", + "extra_field_1": wifi["accc"] + } + timeline.append(ts_event) return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") return False - return False -def __extract_ts_wifi_known_networks(filename): - with open(filename, 'r') as fd: - data = json.load(fd) - for wifi in data.keys(): - ssid = data[wifi]["SSID"] - try: - added = datetime.strptime(data[wifi]["AddedAt"], "%Y-%m-%d %H:%M:%S.%f") - +def __extract_ts_wifi_known_networks(case_folder: str) -> bool: + filename = 'wifi_known_networks.json' + try: + with open(os.path.join(case_folder, filename), 'r') as fd: + data = json.load(fd) + for item in data.values(): + ssid = item["SSID"] # WIFI added - ts_event = { - "message": "WIFI %s added" % ssid, - "timestamp": added.timestamp(), - "datetime": added.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "%s added in known networks plist", - "extra_field_1": "Add reason: %s" % data[wifi]["AddReason"] - } - timeline.append(ts_event) - except Exception as e: - print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") + try: + added = datetime.strptime(item["AddedAt"], "%Y-%m-%d %H:%M:%S.%f") + ts_event = { + "message": "WIFI %s added" % ssid, + "timestamp": added.timestamp(), + "datetime": added.isoformat(), + "timestamp_desc": "%s added in known networks plist", + "extra_field_1": "Add reason: %s" % item["AddReason"] + } + timeline.append(ts_event) + except KeyError: + # some wifi networks do not have an AddedAt field + # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") + pass # WIFI modified - try: - updated = datetime.strptime(data[wifi]["UpdatedAt"], "%Y-%m-%d %H:%M:%S.%f") - ts_event = { - "message": "WIFI %s added" % updated, - "timestamp": updated.timestamp(), - "datetime": updated.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "%s updated in known networks plist", - "extra_field_1": "Add reason: %s" % data[wifi]["AddReason"] - } - timeline.append(ts_event) - except Exception as e: - print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") + try: + updated = datetime.strptime(item["UpdatedAt"], "%Y-%m-%d %H:%M:%S.%f") + ts_event = { + "message": "WIFI %s added" % updated, + "timestamp": updated.timestamp(), + "datetime": updated.isoformat(), + "timestamp_desc": "%s updated in known networks plist", + "extra_field_1": "Add reason: %s" % item["AddReason"] + } + timeline.append(ts_event) + except KeyError: + # some wifi networks do not have an UpdatedAt field + # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") + pass # Password for wifi modified - try: - modified_password = datetime.strptime(data[wifi]["__OSSpecific__"]["WiFiNetworkPasswordModificationDate"], "%Y-%m-%d %H:%M:%S.%f") - ts_event = { - "message": "Password for WIFI %s modified" % ssid, - "timestamp": modified_password.timestamp(), - "datetime": modified_password.strftime("%Y-%m-%dT%H:%M:%S+00:00"), - "timestamp_desc": "%s password modified in known networks plist", - "extra_field_1": "AP mode: %s" % data[wifi]["__OSSpecific__"]["AP_MODE"] - } - timeline.append(ts_event) - except Exception as e: - print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") - - return True - - -def parse_json(jsondir): - """ - Call all the functions defined to extract timestamp from various artifacts - Return a JSON file compatible with TimeSketch - """ - # Loop through all the files to check - for parser in timestamps_files.keys(): - path = "%s/%s" % (jsondir, parser) - - if os.path.exists(path): - function_name = timestamps_files[parser] - parser_function = globals()[function_name] - parser_function(path) + try: + modified_password = datetime.strptime(item["__OSSpecific__"]["WiFiNetworkPasswordModificationDate"], "%Y-%m-%d %H:%M:%S.%f") + ts_event = { + "message": "Password for WIFI %s modified" % ssid, + "timestamp": modified_password.timestamp(), + "datetime": modified_password.isoformat(), + "timestamp_desc": "%s password modified in known networks plist", + "extra_field_1": "AP mode: %s" % item["__OSSpecific__"]["AP_MODE"] + } + timeline.append(ts_event) + except KeyError: + # some wifi networks do not have a password modification date + # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") + pass + return True + except Exception as e: + print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") + return False - # return the timeline as JSON - return timeline +def analyse_path(case_folder: str, output_file: str = 'timeliner.jsonl') -> bool: + # Get all the functions that start with '__extract_ts_' + # and call these with the case_folder as parameter + # FIXME move the for loop within the file write and change to yield + for func in globals(): + if func.startswith('__extract_ts_'): + globals()[func](case_folder) # call the function -def save_timeline(timeline, ts_file): - """ - Save timeline as JSONL (not JSON!!) - """ try: - with open(ts_file, 'w') as f: + with open(output_file, 'w') as f: for event in timeline: line = json.dumps(event) - f.write("%s\n" % line) + f.write(line) + f.write("\n") except Exception as e: - print(f"ERROR: impossible to save timeline to {timeline}. Reason: {str(e)}") - - -def generate_timeline(jsondir, filename): - """ - Generate the timeline and save it to filename - """ - timeline = parse_json(jsondir) - save_timeline(timeline, filename) - return - - -# --------------------------------------------------------------------------- # - - -def main(): - """ - Main function - """ - - print(f"Running {version_string}\n") - - usage = "\n%prog -d JSON directory\n" - - parser = OptionParser(usage=usage) - parser.add_option("-d", dest="inputdir", - action="store", type="string", - help="Directory containing JSON from parsers") - parser.add_option("-o", dest="outputfile", - action="store", type="string", - help="JSON tile to save the timeline") - (options, args) = parser.parse_args() - - # no arguments given by user, print help and exit - if len(sys.argv) == 1: - parser.print_help() - sys.exit(-1) - - # parse PS file :) - if options.inputdir: - timeline = parse_json(options.inputdir) - if options.outputfile: - save_timeline(timeline, options.outputfile) - else: - print(timeline) - else: - print("WARNING -i option is mandatory!") - - -# --------------------------------------------------------------------------- # - -""" - Call main function -""" -if __name__ == "__main__": - - # Create an instance of the Analysis class (called "base") and run main - main() - -# That's all folks ;) + print(f"ERROR: impossible to save timeline to {output_file}. Reason: {str(e)}") + return False + return True diff --git a/parsers/logarchive.py b/parsers/logarchive.py index c03f190..309d780 100644 --- a/parsers/logarchive.py +++ b/parsers/logarchive.py @@ -11,13 +11,10 @@ import tempfile import platform import subprocess +from datetime import datetime, timezone -version_string = "sysdiagnose-logarchive.py v2020-02-07 Version 1.0" -# ----- definition for parsing.py script -----# -# ----- DO NOT DELETE ----# - -parser_description = "Parsing system_logs.logarchive folder" +parser_description = 'Parsing system_logs.logarchive folder' # --------------------------------------------# @@ -27,19 +24,19 @@ # json JSON output. Event data is synthesized as an array of JSON dictionaries. # # ndjson Line-delimited JSON output. Event data is synthesized as JSON dictionaries, each emitted on a single line. -# A trailing record, identified by the inclusion of a "finished" field, is emitted to indicate the end of events. +# A trailing record, identified by the inclusion of a 'finished' field, is emitted to indicate the end of events. # -cmd_parsing_osx = "/usr/bin/log show %s --style ndjson" # fastest and short version -# cmd_parsing_osx = "/usr/bin/log show %s --style json" # fastest and short version -# cmd_parsing_osx = "/usr/bin/log show %s --info --style json" # to enable debug, add --debug -# cmd_parsing_osx = "/usr/bin/log show %s --info --debug --style json" +cmd_parsing_osx = '/usr/bin/log show %s --style ndjson' # fastest and short version +# cmd_parsing_osx = '/usr/bin/log show %s --style json' # fastest and short version +# cmd_parsing_osx = '/usr/bin/log show %s --info --style json' # to enable debug, add --debug +# cmd_parsing_osx = '/usr/bin/log show %s --info --debug --style json' # Linux parsing relies on UnifiedLogReader: # https://github.com/mandiant/macos-UnifiedLogs # Follow instruction in the README.md in order to install it. # TODO unifiedlog_parser is single threaded, either patch their code for multithreading support or do the magic here by parsing each file in a separate thread -cmd_parsing_linux = "unifiedlog_parser_json --input %s --output %s" -cmd_parsing_linux_test = ["unifiedlog_parser_json", "--help"] +cmd_parsing_linux = 'unifiedlog_parser_json --input %s --output %s' +cmd_parsing_linux_test = ['unifiedlog_parser_json', '--help'] # --------------------------------------------------------------------------- # @@ -60,31 +57,31 @@ def parse_path(path: str) -> list | dict: def parse_path_to_folder(path: str, output_folder: str) -> bool: try: - output_folder = os.path.join(output_folder, "logarchive") + output_folder = os.path.join(output_folder, 'logarchive') os.makedirs(output_folder, exist_ok=True) result = get_logs(get_log_files(path)[0], output=output_folder) if len(result) > 0: return True else: - print("Error:") + print('Error:') print(json.dumps(result, indent=4)) return False except IndexError: - print("Error: No system_logs.logarchive/ folder found in logs/ directory") + print('Error: No system_logs.logarchive/ folder found in logs/ directory') return False def get_logs(filename, output=None): - """ + ''' Parse the system_logs.logarchive. When running on OS X, use native tools. On other system use a 3rd party library. - """ + ''' if output is not None: output = os.path.join(output) os.makedirs(output, exist_ok=True) - if (platform.system() == "Darwin"): + if (platform.system() == 'Darwin'): if output is not None: - output = os.path.join(output, "logarchive.json") + output = os.path.join(output, 'logarchive.json') data = get_logs_on_osx(filename, output) return data else: @@ -99,13 +96,13 @@ def get_logs_on_osx(filename, output): def get_logs_on_linux(filename, output): - print("WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X") + print('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') # check if binary exists in PATH, if not, return an error try: subprocess.check_output(cmd_parsing_linux_test, universal_newlines=True) except FileNotFoundError: - print("ERROR: UnifiedLogReader not found, please install it. See README.md for more information.") - return "" + print('ERROR: UnifiedLogReader not found, please install it. See README.md for more information.') + return '' if not output: with tempfile.TemporaryDirectory() as tmp_outpath: @@ -115,7 +112,7 @@ def get_logs_on_linux(filename, output): # read the content of all the files to a variable, a bit crazy as it will eat memory massively # but at least it will be compatible with the overall logic, when needed data = [] - print("WARNING: combining all output files in memory, this is slow and eat a LOT of memory. Use with caution.") + print('WARNING: combining all output files in memory, this is slow and eat a LOT of memory. Use with caution.') for fname in os.listdir(tmp_outpath): with open(os.path.join(tmp_outpath, fname), 'r') as f: try: @@ -135,14 +132,14 @@ def get_logs_on_linux(filename, output): def __execute_cmd_and_get_result(command, outputfile=None): - """ + ''' Return None if it failed or the result otherwise. Outfile can have 3 values: - None: no output except return value - sys.stdout: print to stdout - path to a file to write to - """ + ''' cmd_array = command.split() result = [] @@ -157,9 +154,75 @@ def __execute_cmd_and_get_result(command, outputfile=None): for line in iter(process.stdout.readline, ''): print(line) else: - with open(outputfile, "w") as outfd: + with open(outputfile, 'w') as outfd: for line in iter(process.stdout.readline, ''): outfd.write(line) result = f'Output written to {outputfile}' return result + + +def convert_entry_to_unifiedlog_format(entry: dict) -> dict: + ''' + Convert the entry to unifiedlog format + ''' + # already in the Mandiant unifiedlog format + if 'event_type' in entry: + return entry + ''' + jq '. |= keys' logarchive-native.json > native_keys.txt + sort native_keys.txt | uniq -c | sort -n > native_keys_sort_unique.txt + ''' + + mapper = { + 'creatorActivityID': 'activity_id', + 'messageType': 'log_type', + # 'source': '', # not present in the Mandiant format + # 'backtrace': '', # sub-dictionary + 'activityIdentifier': 'activity_id', + 'bootUUID': 'boot_uuid', # remove - in the UUID + 'category': 'category', + 'eventMessage': 'message', + 'eventType': 'event_type', + 'formatString': 'raw_message', + # 'machTimestamp': '', # not present in the Mandiant format + # 'parentActivityIdentifier': '', # not present in the Mandiant format + 'processID': 'pid', + 'processImagePath': 'process', + 'processImageUUID': 'process_uuid', # remove - in the UUID + 'senderImagePath': 'library', + 'senderImageUUID': 'library_uuid', # remove - in the UUID + # 'senderProgramCounter': '', # not present in the Mandiant format + 'subsystem': 'subsystem', + 'threadID': 'thread_id', + 'timestamp': 'time', # requires conversion + 'timezoneName': 'timezone_name', # ignore timezone as time and timestamp are correct + # 'traceID': '', # not present in the Mandiant format + 'userID': 'euid' + } + + new_entry = {} + for key, value in entry.items(): + if key in mapper: + new_key = mapper[key] + if 'uuid' in new_key: # remove - in UUID + new_entry[new_key] = value.replace('-', '') + else: + new_entry[new_key] = value + else: + # keep the non-matching entries + new_entry[key] = value + # convert time + new_entry['time'] = convert_native_time_to_unifiedlog_format(new_entry['time']) + return new_entry + + +def convert_native_time_to_unifiedlog_format(time: str) -> int: + timestamp = datetime.fromisoformat(time) + return int(timestamp.timestamp() * 1000000000) + + +def convert_unifiedlog_time_to_datetime(time: int) -> datetime: + # convert time to datetime object + timestamp = datetime.fromtimestamp(time / 1000000000, tz=timezone.utc) + return timestamp diff --git a/tests/test_parsers_logarchive.py b/tests/test_parsers_logarchive.py index 2ebfb6b..3228441 100644 --- a/tests/test_parsers_logarchive.py +++ b/tests/test_parsers_logarchive.py @@ -1,4 +1,4 @@ -from parsers.logarchive import get_log_files, parse_path, parse_path_to_folder +from parsers.logarchive import get_log_files, parse_path, parse_path_to_folder, convert_entry_to_unifiedlog_format, convert_unifiedlog_time_to_datetime, convert_native_time_to_unifiedlog_format from tests import SysdiagnoseTestCase import os import platform @@ -21,11 +21,11 @@ def test_get_logs_outputdir(self): # result should contain at least one entry (linux = stdout, mac = mention it's saved to a file) self.assertTrue(result) - if (platform.system() == "Darwin"): - self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "logarchive", "logarchive.json"))) + if (platform.system() == 'Darwin'): + self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, 'logarchive', 'logarchive.json'))) else: - self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "logarchive", "liveData.json"))) - with open(os.path.join(tmp_outpath, "logarchive", "liveData.json"), 'r') as f: + self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, 'logarchive', 'liveData.json'))) + with open(os.path.join(tmp_outpath, 'logarchive', 'liveData.json'), 'r') as f: for line in f: json_data = json.loads(line) self.assertTrue('subsystem' in json_data) @@ -38,6 +38,84 @@ def test_get_logs_result(self): # FIXME check result on a mac self.assertGreater(len(result), 0) + def test_convert_native_time_to_unifiedlog(self): + input = '2023-05-24 13:03:28.908085-0700' + expected_output = 1684958608908084992 + result = convert_native_time_to_unifiedlog_format(input) + self.assertEqual(result, expected_output) + + input = '2023-05-24 20:03:28.908085-0000' + expected_output = 1684958608908084992 + result = convert_native_time_to_unifiedlog_format(input) + self.assertEqual(result, expected_output) + + def test_convert_unifiedlog_time_to_datetime(self): + input = 1684958608908085200 + expected_output = '2023-05-24T20:03:28.908085+00:00' + result = convert_unifiedlog_time_to_datetime(input).isoformat() + self.assertEqual(result, expected_output) + + def test_convert_entry_to_un(self): + input = { + 'timezoneName': '', + 'messageType': 'Default', + 'eventType': 'logEvent', + 'source': None, + 'formatString': 'FIPSPOST_KEXT [%llu] %s:%d: PASSED: (%u ms) - fipspost_post_integrity\n', + 'userID': 0, + 'activityIdentifier': 0, + 'subsystem': '', + 'category': '', + 'threadID': 101, + 'senderImageUUID': 'A6F4A2BD-5575-37EB-91C0-28AB00C8FCBF', + 'backtrace': { + 'frames': [ + { + 'imageOffset': 6084, + 'imageUUID': 'A6F4A2BD-5575-37EB-91C0-28AB00C8FCBF' + } + ] + }, + 'bootUUID': '49BA93E4-C511-47A3-B6CC-62D80BFFE539', + 'processImagePath': '/kernel', + 'senderImagePath': '/System/Library/Extensions/corecrypto.kext/corecrypto', + 'timestamp': '2023-05-24 13:03:28.908085-0700', + 'machTimestamp': 161796510, + 'eventMessage': 'FIPSPOST_KEXT [161796151] fipspost_post:169: PASSED: (1 ms) - fipspost_post_integrity', + 'processImageUUID': '39395A83-7379-3C29-AB78-D1B5EDB9C714', + 'traceID': 444438921084932, + 'processID': 0, + 'senderProgramCounter': 6084, + 'parentActivityIdentifier': 0 + } + expected_output = { + 'timezone_name': '', + 'log_type': 'Default', + 'event_type': 'logEvent', + 'source': None, + 'raw_message': 'FIPSPOST_KEXT [%llu] %s:%d: PASSED: (%u ms) - fipspost_post_integrity\n', + 'euid': 0, + 'activity_id': 0, + 'subsystem': '', + 'category': '', + 'thread_id': 101, + 'library_uuid': 'A6F4A2BD557537EB91C028AB00C8FCBF', + 'backtrace': {'frames': [{'imageOffset': 6084, 'imageUUID': 'A6F4A2BD-5575-37EB-91C0-28AB00C8FCBF'}]}, + 'boot_uuid': '49BA93E4C51147A3B6CC62D80BFFE539', + 'process': '/kernel', + 'library': '/System/Library/Extensions/corecrypto.kext/corecrypto', + 'time': 1684958608908084992, + 'machTimestamp': 161796510, + 'message': 'FIPSPOST_KEXT [161796151] fipspost_post:169: PASSED: (1 ms) - fipspost_post_integrity', + 'process_uuid': '39395A8373793C29AB78D1B5EDB9C714', + 'traceID': 444438921084932, + 'pid': 0, + 'senderProgramCounter': 6084, + 'parentActivityIdentifier': 0 + } + result = convert_entry_to_unifiedlog_format(input) + self.assertEqual(result, expected_output) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_parsers_taskinfo.py b/tests/test_parsers_taskinfo.py index 5249e00..888aa2f 100644 --- a/tests/test_parsers_taskinfo.py +++ b/tests/test_parsers_taskinfo.py @@ -20,7 +20,7 @@ def test_get_tasks(self): # a delta is not abnormal, as numb_tasks seems to be taking the count from "ps". # "ps" always has at least two processes running (ps and psauxwww) # Execution of taskinfo happens at another moment, so other processes may have started/stopped - self.assertTrue((len_tasks > numb_tasks - 4)) + self.assertAlmostEqual(len_tasks, numb_tasks, delta=4) for task in result['tasks']: self.assertGreater(len(task['threads']), 0)