Skip to content

Commit

Permalink
chg: [analysers] wifi_geolocation
Browse files Browse the repository at this point in the history
  • Loading branch information
cvandeplas committed Jun 5, 2024
1 parent 7499200 commit 6075a36
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 27 deletions.
32 changes: 32 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@
"args": "analyse apps 1",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: analyse.py analyse wifi_geolocation 1",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/analyse.py",
"args": "analyse wifi_geolocation 1",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: analyse.py analyse wifi_geolocation_kml 1",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/analyse.py",
"args": "analyse wifi_geolocation_kml 1",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: analyse.py analyse timeliner 1",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/analyse.py",
"args": "analyse timeliner 1",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: logarchive.py",
"type": "debugpy",
Expand All @@ -45,6 +69,14 @@
"args": "list all",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: parsing.py parse demo_parser",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/parsing.py",
"args": "parse demo_parser 1",
"cwd": "${workspaceFolder}/"
},
{
"name": "Python Debugger: parsing.py parse itunesstore",
"type": "debugpy",
Expand Down
13 changes: 8 additions & 5 deletions analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def list_analysers(folder):
spec.loader.exec_module(module)
line = [analyser[:-3], module.analyser_description]
lines.append(line)
except: # noqa: E722
except AttributeError:
continue

headers = ['Analyser Name', 'Analyser Description']
Expand All @@ -58,14 +58,14 @@ def list_analysers(folder):

def analyse(analyser, caseid):
# Load parser module
spec = importlib.util.spec_from_file_location(analyser[:-3], config.analysers_folder + "/" + analyser + '.py')
print(spec, file=sys.stderr)
spec = importlib.util.spec_from_file_location(analyser[:-3], os.path.join(config.analysers_folder, analyser + '.py'))
# print(spec, file=sys.stderr)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

# building command
parse_data_path = "%s/%s/" % (config.parsed_data_folder, caseid)
output_file = config.parsed_data_folder + caseid + '/' + analyser + "." + module.analyser_format
output_file = os.path.join(config.parsed_data_folder, caseid, analyser + "." + module.analyser_format)
command = "module.%s('%s', '%s')" % (module.analyser_call, parse_data_path, output_file)
result = eval(command)

Expand All @@ -85,7 +85,10 @@ def allanalysers(caseid):
try:
print(f'Trying: {analyser[:-3]}', file=sys.stderr)
analyse(analyser[:-3], caseid)
except: # noqa: E722
except Exception as e: # noqa: E722
import traceback
print(f"Error: Problem while executing module {analyser[:-3]}. Reason: {str(e)}", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
continue
os.chdir(prev_folder)
return 0
Expand Down
28 changes: 24 additions & 4 deletions analysers/wifi_gelocation.py → analysers/wifi_geolocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import dateutil.parser
from optparse import OptionParser

import os
import gpxpy
import gpxpy.gpx

Expand All @@ -21,8 +21,25 @@
# ----- DO NOT DELETE ----#

analyser_description = "Generate GPS Exchange (GPX) of wifi geolocations"
analyser_call = "generate_gpx"
analyser_format = "json"
analyser_call = "analyse_path"
analyser_format = "gpx"


def analyse_path(case_folder: str, outfile: str = "wifi-geolocations.gpx") -> bool:
potential_source_files = ['wifinetworks/WiFi_com.apple.wifi.known-networks.plist.json', 'plists/WiFi_com.apple.wifi.known-networks.plist.json', 'wifi_known_networks.json']
input_file_path = None
for fname in potential_source_files:
input_file_path = os.path.join(case_folder, fname)
if os.path.isfile(input_file_path):
break
if not input_file_path:
# TODO we could call the parser and generate the file for us...and then parse it...
raise FileNotFoundError(f"Could not find any of the potential source files: {potential_source_files}.")

# we have a valid file_path and can generate the gpx file
with open(input_file_path, 'r') as f:
json_data = json.load(f)
return generate_gpx_from_known_networks_json(json_data=json_data, outfile=outfile)


def generate_gpx(jsonfile: str, outfile: str = "wifi-geolocations.gpx"):
Expand All @@ -45,11 +62,14 @@ def generate_gpx(jsonfile: str, outfile: str = "wifi-geolocations.gpx"):
sys.exit(-2)
json_data = json_entry # start here


def generate_gpx_from_known_networks_json(json_data: str, outfile: str):

# Create new GPX object
gpx = gpxpy.gpx.GPX()

for network_name, network_data in json_data.items():
ssid = network_name
ssid = network_data.get('SSID', network_name)
# timestamps are always tricky
timestamp_str = network_data.get('AddedAt', '')
if config.debug:
Expand Down
6 changes: 4 additions & 2 deletions parsers/demo_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def parse_path(path: str) -> list | dict:
return json_object


def parse_path_to_folder(path: str, output: str) -> bool:
def parse_path_to_folder(path: str, output_folder: str) -> bool:
'''
this is the function that will be called
'''
Expand All @@ -45,7 +45,9 @@ def parse_path_to_folder(path: str, output: str) -> bool:
for log_file in log_files:
pass
# ideally stream to the file directly
with open(os.path.join(output, "demo_output.json"), "w") as f:
output_folder = os.path.join(output_folder, __name__.split('.')[-1])
os.makedirs(output_folder, exist_ok=True)
with open(os.path.join(output_folder, "demo_output.json"), "w") as f:
json.dump(json_object, f)
return True
except Exception as e:
Expand Down
6 changes: 4 additions & 2 deletions parsers/logarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def parse_path(path: str) -> list | dict:
return {'error': 'No system_logs.logarchive/ folder found in logs/ directory'}


def parse_path_to_folder(path: str, output: str) -> bool:
def parse_path_to_folder(path: str, output_folder: str) -> bool:
try:
result = get_logs(get_log_files(path)[0], output=output)
output_folder = os.path.join(output_folder, "logarchive")
os.makedirs(output_folder, exist_ok=True)
result = get_logs(get_log_files(path)[0], output=output_folder)
if len(result['data']) > 0:
return True
else:
Expand Down
7 changes: 4 additions & 3 deletions parsers/plists.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ def parse_path(path: str) -> dict:
return result


def parse_path_to_folder(path: str, output: str) -> bool:
os.makedirs(output, exist_ok=True)
def parse_path_to_folder(path: str, output_folder: str) -> bool:
output_folder = os.path.join(output_folder, __name__.split('.')[-1])
os.makedirs(output_folder, exist_ok=True)
for logfile in get_log_files(path):
try:
json_data = misc.load_plist_file_as_json(logfile)
except Exception as e:
json_data = {"error": str(e)}
end_of_path = logfile[len(path):].lstrip(os.path.sep) # take the path after the root path
output_filename = end_of_path.replace(os.path.sep, '_') + '.json' # replace / with _ in the path
with open(os.path.join(output, output_filename), 'w') as f:
with open(os.path.join(output_folder, output_filename), 'w') as f:
json.dump(json_data, f, indent=4)
9 changes: 8 additions & 1 deletion parsers/wifi_known_networks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
import glob
import utils.misc as misc

import json

parser_description = "Parsing Known Wifi Networks plist file"

Expand All @@ -29,6 +29,13 @@ def parse_path(path: str) -> list | dict:
return misc.load_plist_file_as_json(get_log_files(path)[0])


def parse_path_to_folder(path: str, output_folder: str) -> bool:
result = parse_path(path)
output_file = os.path.join(output_folder, f"{__name__.split('.')[-1]}.json")
with open(output_file, 'w') as f:
json.dump(result, f, indent=4)


'''
code usefull for future printing function
Expand Down
7 changes: 4 additions & 3 deletions parsers/wifinetworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def parse_path(path: str) -> dict:
return result


def parse_path_to_folder(path: str, output: str) -> bool:
os.makedirs(output, exist_ok=True)
def parse_path_to_folder(path: str, output_folder: str) -> bool:
output_folder = os.path.join(output_folder, __name__.split('.')[-1])
os.makedirs(output_folder, exist_ok=True)
for logfile in get_log_files(path):
try:
json_data = parse_file(logfile)
except Exception as e:
json_data = {"error": str(e)}
end_of_path = logfile[len(path):].lstrip(os.path.sep) # take the path after the root path
output_filename = end_of_path.replace(os.path.sep, '_') + '.json' # replace / with _ in the path
with open(os.path.join(output, output_filename), 'w') as f:
with open(os.path.join(output_folder, output_filename), 'w') as f:
json.dump(json_data, f, indent=4)
6 changes: 3 additions & 3 deletions parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def parse(parser, case_id):
# print(json.dumps(case, indent=4), file=sys.stderr) #debug

# Load parser module
spec = importlib.util.spec_from_file_location(parser[:-3], os.path.join(config.parsers_folder, parser) + '.py')
spec = importlib.util.spec_from_file_location(parser, os.path.join(config.parsers_folder, parser) + '.py')
# print(spec, file=sys.stderr)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
Expand All @@ -123,9 +123,9 @@ def parse(parser, case_id):
log_root_path = os.path.join(case_folder, os.listdir(case_folder).pop())

if hasattr(module, 'parse_path_to_folder'):
output_folder = os.path.join(config.parsed_data_folder, case_id, parser)
output_folder = os.path.join(config.parsed_data_folder, case_id)
os.makedirs(output_folder, exist_ok=True)
result = module.parse_path_to_folder(path=log_root_path, output=output_folder)
result = module.parse_path_to_folder(path=log_root_path, output_folder=output_folder)
print(f'Execution finished, output saved in: {output_folder}', file=sys.stderr)
else: # if the module cannot (yet) save directly to a folder, we wrap around by doing it ourselves
# parsers that output in the result variable
Expand Down
24 changes: 24 additions & 0 deletions tests/test_analysers_wifi_geolocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from analysers.wifi_geolocation import analyse_path
from parsers.wifi_known_networks import parse_path_to_folder, get_log_files
from tests import SysdiagnoseTestCase
import unittest
import os
import tempfile


class TestAnalysersWifiGeolocation(SysdiagnoseTestCase):

def test_analyse_wifi_geolocation(self):
for log_root_path in self.log_root_paths:
files = get_log_files(log_root_path)
self.assertTrue(len(files) > 0)
print(f'Parsing {files}')
with tempfile.TemporaryDirectory() as tmp_outpath:
parse_path_to_folder(log_root_path, output_folder=tmp_outpath)
output_file = os.path.join(tmp_outpath, 'wifi_geolocation.gpx')
analyse_path(case_folder=tmp_outpath, outfile=output_file)
self.assertTrue(os.path.isfile(output_file))


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def list_all_parsers(self):
yield parser

def test_parsers_filestructure(self):
required_functions = ['get_log_files', 'parse_path'] # TODO add parse_path_to_folder(path: str, output: str) -> bool:
required_functions = ['get_log_files', 'parse_path'] # TODO add parse_path_to_folder(path: str, output_folder: str) -> bool:
required_variables = ['parser_description']

parsers = self.list_all_parsers()
Expand Down
6 changes: 3 additions & 3 deletions tests/test_parsers_logarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ def test_get_logs_outputdir(self):

with tempfile.TemporaryDirectory() as tmp_outpath:
print(f'Parsing {folders} to {tmp_outpath}')
result = parse_path_to_folder(log_root_path, output=tmp_outpath)
result = parse_path_to_folder(log_root_path, output_folder=tmp_outpath)
# check if folder is not empty
self.assertNotEqual(os.listdir(tmp_outpath), [])
# result should contain at least one entry (linux = stdout, mac = mention it's saved to a file)
self.assertTrue(result)

if (platform.system() == "Darwin"):
self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "logarchive.json")))
self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "logarchive", "logarchive.json")))
else:
self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "liveData.json")))
self.assertTrue(os.path.isfile(os.path.join(tmp_outpath, "logarchive", "liveData.json")))

def test_get_logs_result(self):
for log_root_path in self.log_root_paths:
Expand Down

0 comments on commit 6075a36

Please sign in to comment.