diff --git a/analysers/timeliner.py b/analysers/timeliner.py index e70f1a5..c372b98 100644 --- a/analysers/timeliner.py +++ b/analysers/timeliner.py @@ -10,6 +10,7 @@ import json from datetime import datetime, timezone from parsers.logarchive import convert_entry_to_unifiedlog_format, convert_unifiedlog_time_to_datetime +from collections.abc import Generator analyser_description = 'Generate a Timesketch compatible timeline' @@ -18,10 +19,9 @@ # Timesketch format: # {'message': 'A message','timestamp': 123456789,'datetime': '2015-07-24T19:01:01+00:00','timestamp_desc': 'Write time','extra_field_1': 'foo'} -timeline = [] -def __extract_ts_mobileactivation(case_folder: str) -> bool: +def __extract_ts_mobileactivation(case_folder: str) -> Generator[dict, None, None]: try: filename = 'mobileactivation.json' with open(os.path.join(case_folder, filename), 'r') as fd: @@ -39,27 +39,26 @@ def __extract_ts_mobileactivation(case_folder: str) -> bool: # skip other type of event # FIXME what should we do? the log file (now) contains nice timestamps, do we want to extract less, but summarized, data? continue - timeline.append(ts_event) - return True + yield ts_event except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") - return False -def __extract_ts_powerlogs(case_folder: str) -> bool: +def __extract_ts_powerlogs(case_folder: str) -> Generator[dict, None, None]: try: filename = 'powerlogs.json' with open(os.path.join(case_folder, filename), 'r') as fd: data = json.load(fd) # extract tables of interest - __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(data) # PLProcessMonitorAgent_EventPoint_ProcessExit - __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(data) # PLProcessMonitorAgent_EventBackward_ProcessExitHistogram - __powerlogs__PLAccountingOperator_EventNone_Nodes(data) # PLAccountingOperator_EventNone_Nodes - return True + for entry in __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(data): + yield entry + for entry in __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(data): + yield entry + for entry in __powerlogs__PLAccountingOperator_EventNone_Nodes(data): + yield entry except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") - return False def __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(jdata): @@ -77,8 +76,7 @@ def __powerlogs__PLProcessMonitorAgent_EventPoint_ProcessExit(jdata): 'timestamp_desc': 'Process Exit with reason code: %d reason namespace %d' % (proc['ReasonCode'], proc['ReasonNamespace']), 'extra_field_1': extra_field } - timeline.append(ts_event) - return + yield ts_event def __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(jdata): @@ -92,8 +90,7 @@ def __powerlogs__PLProcessMonitorAgent_EventBackward_ProcessExitHistogram(jdata) 'timestamp_desc': 'Process Exit with reason code: %d reason namespace %d' % (event['ReasonCode'], event['ReasonNamespace']), 'extra_field_1': 'Crash frequency: [0-5s]: %d, [5-10s]: %d, [10-60s]: %d, [60s+]: %d' % (event['0s-5s'], event['5s-10s'], event['10s-60s'], event['60s+']) } - timeline.append(ts_event) - return + yield ts_event def __powerlogs__PLAccountingOperator_EventNone_Nodes(jdata): @@ -107,11 +104,10 @@ def __powerlogs__PLAccountingOperator_EventNone_Nodes(jdata): 'timestamp_desc': 'PLAccountingOperator Event', 'extra_field_1': 'Is permanent: %d' % event['IsPermanent'] } - timeline.append(ts_event) - return + yield ts_event -def __extract_ts_swcutil(case_folder: str) -> bool: +def __extract_ts_swcutil(case_folder: str) -> Generator[dict, None, None]: filename = 'swcutil.json' try: with open(os.path.join(case_folder, filename), 'r') as fd: @@ -127,18 +123,16 @@ def __extract_ts_swcutil(case_folder: str) -> bool: 'timestamp_desc': 'swcutil last checkeed', 'extra_field_1': 'application: %s' % service['App ID'] } - timeline.append(ts_event) + yield ts_event except KeyError: # some entries do not have a Last Checked or timestamp field # print(f"WARNING {filename} while extracting timestamp from {(service['Service'])} - {(service['App ID'])}. Record not inserted.") pass - return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") - return False -def __extract_ts_accessibility_tcc(case_folder: str) -> bool: +def __extract_ts_accessibility_tcc(case_folder: str) -> Generator[dict, None, None]: filename = 'accessibility_tcc.json' try: with open(os.path.join(case_folder, filename), 'r') as fd: @@ -154,14 +148,12 @@ def __extract_ts_accessibility_tcc(case_folder: str) -> bool: 'timestamp_desc': 'Accessibility TC Last Modified', 'extra_field_1': 'client: %s' % access['client'] } - timeline.append(ts_event) - return True + yield ts_event except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") - return False -def __extract_ts_shutdownlogs(case_folder: str) -> bool: +def __extract_ts_shutdownlogs(case_folder: str) -> Generator[dict, None, None]: filename = 'shutdownlogs.json' try: with open(os.path.join(case_folder, filename), 'r') as fd: @@ -178,18 +170,15 @@ def __extract_ts_shutdownlogs(case_folder: str) -> bool: 'timestamp_desc': 'Entry in shutdown.log', 'extra_field_1': 'pid: %s' % p['pid'] } - timeline.append(ts_event) + yield ts_event except Exception as e: print(f"WARNING: shutdownlog entry not parsed: {ts}. Reason: {str(e)}") - return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason: {str(e)}") - return False -def __extract_ts_logarchive(case_folder: str) -> bool: +def __extract_ts_logarchive(case_folder: str) -> Generator[dict, None, None]: logarchive_dir = os.path.join(case_folder, 'logarchive') - no_error = True for file_in_logarchive_dir in os.listdir(logarchive_dir): try: with open(os.path.join(logarchive_dir, file_in_logarchive_dir), 'r') as fd: @@ -206,16 +195,14 @@ def __extract_ts_logarchive(case_folder: str) -> bool: 'timestamp_desc': 'Entry in logarchive: %s' % trace['event_type'], 'extra_field_1': f"subsystem: {trace['subsystem']}; process_uuid: {trace['process_uuid']}; process: {trace['process']}; library: {trace['library']}; library_uuid: {trace['library_uuid']}" } - timeline.append(ts_event) + yield ts_event except KeyError as e: print(f"WARNING: trace not parsed: {trace}. Error {e}") except Exception as e: print(f"ERROR while extracting timestamp from {file_in_logarchive_dir}. Reason: {str(e)}") - no_error = False - return no_error -def __extract_ts_wifisecurity(case_folder: str) -> bool: +def __extract_ts_wifisecurity(case_folder: str) -> Generator[dict, None, None]: filename = 'wifisecurity.json' try: with open(os.path.join(case_folder, filename), 'r') as fd: @@ -233,7 +220,7 @@ def __extract_ts_wifisecurity(case_folder: str) -> bool: 'timestamp_desc': 'SSID added to known secured WIFI list', 'extra_field_1': wifi['accc'] } - timeline.append(ts_event) + yield ts_event # Event 2: modification ts_event = { @@ -243,14 +230,12 @@ def __extract_ts_wifisecurity(case_folder: str) -> bool: 'timestamp_desc': 'SSID modified into the secured WIFI list', 'extra_field_1': wifi['accc'] } - timeline.append(ts_event) - return True + yield ts_event except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") - return False -def __extract_ts_wifi_known_networks(case_folder: str) -> bool: +def __extract_ts_wifi_known_networks(case_folder: str) -> Generator[dict, None, None]: filename = 'wifi_known_networks.json' try: with open(os.path.join(case_folder, filename), 'r') as fd: @@ -267,7 +252,7 @@ def __extract_ts_wifi_known_networks(case_folder: str) -> bool: 'timestamp_desc': '%s added in known networks plist', 'extra_field_1': 'Add reason: %s' % item['AddReason'] } - timeline.append(ts_event) + yield ts_event except KeyError: # some wifi networks do not have an AddedAt field # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") @@ -283,7 +268,7 @@ def __extract_ts_wifi_known_networks(case_folder: str) -> bool: 'timestamp_desc': '%s updated in known networks plist', 'extra_field_1': 'Add reason: %s' % item['AddReason'] } - timeline.append(ts_event) + yield ts_event except KeyError: # some wifi networks do not have an UpdatedAt field # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") @@ -299,31 +284,27 @@ def __extract_ts_wifi_known_networks(case_folder: str) -> bool: 'timestamp_desc': '%s password modified in known networks plist', 'extra_field_1': 'AP mode: %s' % item['__OSSpecific__']['AP_MODE'] } - timeline.append(ts_event) + yield ts_event except KeyError: # some wifi networks do not have a password modification date # print(f"ERROR {filename} while extracting timestamp from {ssid}. Reason: {str(e)}. Record not inserted.") pass - return True except Exception as e: print(f"ERROR while extracting timestamp from {filename}. Reason {str(e)}") - return False def analyse_path(case_folder: str, output_file: str = 'timeliner.jsonl') -> bool: # Get all the functions that start with '__extract_ts_' # and call these with the case_folder as parameter - # FIXME move the for loop within the file write and change to yield - for func in globals(): - if func.startswith('__extract_ts_'): - globals()[func](case_folder) # call the function - + # do this using generators, as this eats much less memory and is just so much more efficient try: with open(output_file, 'w') as f: - for event in timeline: - line = json.dumps(event) - f.write(line) - f.write('\n') + for func in globals(): + if func.startswith('__extract_ts_'): + for event in globals()[func](case_folder): # call the function + line = json.dumps(event) + f.write(line) + f.write('\n') except Exception as e: print(f"ERROR: impossible to save timeline to {output_file}. Reason: {str(e)}") return False