Skip to content

Commit

Permalink
Merge pull request #5 from doronz88/refactor/everything
Browse files Browse the repository at this point in the history
Refactor/everything
  • Loading branch information
doronz88 authored Jul 12, 2023
2 parents 1da7076 + 066ae78 commit 144aae7
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 231 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ name: Python application
on:
push:
branches: [ '**' ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master" ]

jobs:
build:
runs-on: ${{ matrix.os }}

strategy:
matrix:
python-version: [ 3.7, 3.8, 3.9, "3.10", "3.11" ]
python-version: [ 3.8, 3.9, "3.10", "3.11" ]
os: [ ubuntu-latest, macos-latest, windows-latest ]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Lint with flake8
run: |
python -m pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82,F401,E741 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
flake8 . --max-line-length=127
- name: Verify sorted imports
run: |
python -m pip install isort
Expand Down
237 changes: 18 additions & 219 deletions harlogger/__main__.py
Original file line number Diff line number Diff line change
@@ -1,253 +1,52 @@
import json
import os
import posixpath
from urllib.parse import urlparse

import click
from pygments import highlight
from pygments.formatters.terminal256 import TerminalTrueColorFormatter
from pygments.lexers.textfmts import HttpLexer
from pymobiledevice3.cli.cli_common import Command
from pymobiledevice3.lockdown import LockdownClient
from pymobiledevice3.services.os_trace import OsTraceService


def get_header_from_list(name, headers):
for header in headers:
if header['name'].lower() == name.lower():
return header['value']
return None


def is_in_insensitive_list(needle, haystack):
for h in haystack:
if needle.lower() == h.lower():
return True
return False


def show_http_packet(http_packet, filter_headers):
buf = ''
version = 'HTTP/1.0'
if http_packet['httpVersion'] == 'h2':
version = 'HTTP/2.0'

if 'url' in http_packet:
# request
url = urlparse(http_packet['url'])
uri = url.path
if url.query:
uri += f'?{url.query}'

buf += f'{http_packet["method"]} {uri} {version}\r\n'
else:
# response
if http_packet['status'] == 0:
# isn't a real packet
return
buf += f'{version} {http_packet["status"]} {http_packet["statusText"]}\r\n'

for header in http_packet['headers']:
if (filter_headers is not None) and (len(filter_headers) > 0) and \
not is_in_insensitive_list(header['name'], filter_headers):
continue
buf += f'{header["name"]}: {header["value"]}\r\n'

buf += '\r\n'

content = {}

if 'postData' in http_packet:
content = http_packet['postData']

if 'content' in http_packet:
content = http_packet['content']

print(highlight(buf, HttpLexer(), TerminalTrueColorFormatter(style='autumn')))

if 'text' in content:
print(content['text'])


def show_har_entry(entry, filter_headers=None, show_request=True, show_response=True):
filename = posixpath.basename(entry['filename'])
pid = entry['pid']

process = f'{filename}({pid})'

if show_request:
request = entry['request']

print(f'➡️ {process} {request["method"]} {request["url"]}')
show_http_packet(request, filter_headers)

if show_response:
response = entry['response']
print(f'⬅️ {process} {response["status"]} {response["statusText"]}')
show_http_packet(response, filter_headers)


def parse_fields(message: str):
result = {}

for line in message.split('\n'):
if ': ' not in line:
continue

line = line.strip()
k, v = line.split(':', 1)
k = k.strip()
v = v.strip()
result[k] = v

return result
from harlogger.sniffers import Filters, SnifferPreference, SnifferProfile


@click.group()
def cli():
pass


@cli.command('profile')
@click.option('pids', '-p', '--pid', multiple=True, help='filter pid list')
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@cli.command('profile', cls=Command)
@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list')
@click.option('--color/--no-color', default=True)
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@click.option('images', '-i', '--image', multiple=True, help='filter image list')
@click.option('--request/--no-request', is_flag=True, default=True, help='show requests')
@click.option('--response/--no-response', is_flag=True, default=True, help='show responses')
def cli_profile(pids, process_names, color, request, response):
@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination')
def cli_profile(lockdown: LockdownClient, pids, process_names, color, request, response, images, unique):
"""
Sniff using CFNetowrkDiagnostics.mobileconfig profile.
Sniff using CFNetworkDiagnostics.mobileconfig profile.
This requires the specific Apple profile to be installed for the sniff to work.
"""
lockdown = LockdownClient()

for entry in OsTraceService(lockdown).syslog():
if entry.label is None or entry.label.subsystem != 'com.apple.CFNetwork' or \
entry.label.category != 'Diagnostics':
continue

if pids and (entry.pid not in pids):
continue

if process_names and (posixpath.basename(entry.filename) not in process_names):
continue
filters = Filters(pids, process_names, images)
SnifferProfile(lockdown, filters=filters, request=request, response=response, color=color, unique=unique).sniff()

lines = entry.message.split('\n')
if len(lines) < 2:
continue

buf = ''

if lines[1].strip().startswith('Protocol Enqueue: request') and request:
# request
print('➡️ ', end='')
fields = parse_fields(entry.message)
buf += f'{fields["Message"]}\n'
for name, value in fields.items():
if name in ('Protocol Enqueue', 'Request', 'Message'):
continue
buf += f'{name}: {value}\n'

elif lines[1].strip().startswith('Protocol Received: request') and response:
# response
print('⬅️ ', end='')
fields = parse_fields(entry.message)
buf += f'{fields["Response"]} ({fields["Protocol Received"]})\n'
for name, value in fields.items():
if name in ('Protocol Received', 'Response'):
continue
buf += f'{name}: {value}\n'

if buf:
if color:
print(highlight(buf, HttpLexer(), TerminalTrueColorFormatter(style='autumn')))
else:
print(buf)


@cli.command('preference')
@click.option('--udid')
@cli.command('preference', cls=Command)
@click.option('-o', '--out', type=click.File('w'), help='file to store the har entries into upon exit (ctrl+c)')
@click.option('pids', '-p', '--pid', multiple=True, help='filter pid list')
@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list')
@click.option('--color/--no-color', default=True)
@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list')
@click.option('images', '-i', '--image', multiple=True, help='filter image list')
@click.option('headers', '-h', '--header', multiple=True, help='filter header list')
@click.option('--request/--no-request', is_flag=True, default=True, help='show requests')
@click.option('--response/--no-response', is_flag=True, default=True, help='show responses')
@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination')
def cli_preference(udid, out, pids, process_names, images, headers, request, response, unique):
def cli_preference(lockdown: LockdownClient, out, pids, process_names, images, request, response, color, unique):
"""
Sniff using the secret com.apple.CFNetwork.plist configuration.
This sniff includes the request/response body as well but requires the device to be jailbroken for
the sniff to work
"""
shown_set = set()
har = {
'log': {
'version': '0.1',
'creator': {
'name': 'remote-har-listener',
'version': '0.1',
},
'entries': [],
}
}

lockdown = LockdownClient(serial=udid)
os_trace_service = OsTraceService(lockdown)
incomplete = ''

try:
for line in os_trace_service.syslog():
if line.label is None:
continue
if line.label.category != 'HAR':
continue

image = os.path.basename(line.image_name)
pid = line.pid
message = line.message

if (len(pids) > 0) and (pid not in pids):
continue

if (len(images) > 0) and (image not in images):
continue

if process_names and (posixpath.basename(line.filename) not in process_names):
continue

try:
entry = json.loads(incomplete + message)
incomplete = ''
except json.decoder.JSONDecodeError:
if message.startswith('<incomplete>'):
incomplete += message.split('<incomplete>', 1)[1]
continue
elif len(incomplete) > 0:
incomplete += message
continue
raise

# artificial HAR information extracted from syslog line
entry['image'] = image
entry['pid'] = pid
entry['filename'] = line.filename

if unique:
entry_hash = (image, pid, entry['request']['method'], entry['request']['url'])

if entry_hash in shown_set:
continue

shown_set.add(entry_hash)
show_har_entry(entry, filter_headers=headers, show_request=request, show_response=response)

har['log']['entries'].append(entry)
except KeyboardInterrupt:
if out:
out.write(json.dumps(har, indent=4))
filters = Filters(pids, process_names, images)
SnifferPreference(lockdown, filters=filters, request=request, response=response, out=out, color=color,
unique=unique).sniff()


if __name__ == '__main__':
Expand Down
6 changes: 6 additions & 0 deletions harlogger/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class HarloggerException(Exception):
pass


class HTTPParseError(HarloggerException):
pass
19 changes: 19 additions & 0 deletions harlogger/haralyzer_patches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from functools import cached_property
from typing import Optional

from haralyzer.http import Response


class ResponseHook(Response):
@cached_property
def text(self) -> Optional[str]:
"""
:return: Response body
:rtype: str
"""
content = self.raw_entry["content"]
return content.get("_textBase64", content.get("text"))


def add_text_base64_support_for_haralyzer() -> None:
setattr(Response, 'text', ResponseHook.text)
Loading

0 comments on commit 144aae7

Please sign in to comment.