diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 5314acf..5eb3436 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -6,6 +6,9 @@ name: Python application on: push: branches: [ '**' ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "master" ] jobs: build: @@ -13,22 +16,19 @@ jobs: strategy: matrix: - python-version: [ 3.7, 3.8, 3.9, "3.10", "3.11" ] + python-version: [ 3.8, 3.9, "3.10", "3.11" ] os: [ ubuntu-latest, macos-latest, windows-latest ] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Lint with flake8 run: | python -m pip install flake8 - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82,F401,E741 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + flake8 . --max-line-length=127 - name: Verify sorted imports run: | python -m pip install isort diff --git a/harlogger/__main__.py b/harlogger/__main__.py index 0ac3401..4c4c0cc 100644 --- a/harlogger/__main__.py +++ b/harlogger/__main__.py @@ -1,105 +1,8 @@ -import json -import os -import posixpath -from urllib.parse import urlparse - import click -from pygments import highlight -from pygments.formatters.terminal256 import TerminalTrueColorFormatter -from pygments.lexers.textfmts import HttpLexer +from pymobiledevice3.cli.cli_common import Command from pymobiledevice3.lockdown import LockdownClient -from pymobiledevice3.services.os_trace import OsTraceService - - -def get_header_from_list(name, headers): - for header in headers: - if header['name'].lower() == name.lower(): - return header['value'] - return None - - -def is_in_insensitive_list(needle, haystack): - for h in haystack: - if needle.lower() == h.lower(): - return True - return False - - -def show_http_packet(http_packet, filter_headers): - buf = '' - version = 'HTTP/1.0' - if http_packet['httpVersion'] == 'h2': - version = 'HTTP/2.0' - - if 'url' in http_packet: - # request - url = urlparse(http_packet['url']) - uri = url.path - if url.query: - uri += f'?{url.query}' - - buf += f'{http_packet["method"]} {uri} {version}\r\n' - else: - # response - if http_packet['status'] == 0: - # isn't a real packet - return - buf += f'{version} {http_packet["status"]} {http_packet["statusText"]}\r\n' - - for header in http_packet['headers']: - if (filter_headers is not None) and (len(filter_headers) > 0) and \ - not is_in_insensitive_list(header['name'], filter_headers): - continue - buf += f'{header["name"]}: {header["value"]}\r\n' - - buf += '\r\n' - - content = {} - - if 'postData' in http_packet: - content = http_packet['postData'] - - if 'content' in http_packet: - content = http_packet['content'] - - print(highlight(buf, HttpLexer(), TerminalTrueColorFormatter(style='autumn'))) - - if 'text' in content: - print(content['text']) - - -def show_har_entry(entry, filter_headers=None, show_request=True, show_response=True): - filename = posixpath.basename(entry['filename']) - pid = entry['pid'] - - process = f'{filename}({pid})' - - if show_request: - request = entry['request'] - - print(f'➡️ {process} {request["method"]} {request["url"]}') - show_http_packet(request, filter_headers) - - if show_response: - response = entry['response'] - print(f'⬅️ {process} {response["status"]} {response["statusText"]}') - show_http_packet(response, filter_headers) - - -def parse_fields(message: str): - result = {} - - for line in message.split('\n'): - if ': ' not in line: - continue - - line = line.strip() - k, v = line.split(':', 1) - k = k.strip() - v = v.strip() - result[k] = v - return result +from harlogger.sniffers import Filters, SnifferPreference, SnifferProfile @click.group() @@ -107,147 +10,43 @@ def cli(): pass -@cli.command('profile') -@click.option('pids', '-p', '--pid', multiple=True, help='filter pid list') -@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list') +@cli.command('profile', cls=Command) +@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list') @click.option('--color/--no-color', default=True) +@click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list') +@click.option('images', '-i', '--image', multiple=True, help='filter image list') @click.option('--request/--no-request', is_flag=True, default=True, help='show requests') @click.option('--response/--no-response', is_flag=True, default=True, help='show responses') -def cli_profile(pids, process_names, color, request, response): +@click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination') +def cli_profile(lockdown: LockdownClient, pids, process_names, color, request, response, images, unique): """ - Sniff using CFNetowrkDiagnostics.mobileconfig profile. + Sniff using CFNetworkDiagnostics.mobileconfig profile. This requires the specific Apple profile to be installed for the sniff to work. """ - lockdown = LockdownClient() - - for entry in OsTraceService(lockdown).syslog(): - if entry.label is None or entry.label.subsystem != 'com.apple.CFNetwork' or \ - entry.label.category != 'Diagnostics': - continue - - if pids and (entry.pid not in pids): - continue - - if process_names and (posixpath.basename(entry.filename) not in process_names): - continue + filters = Filters(pids, process_names, images) + SnifferProfile(lockdown, filters=filters, request=request, response=response, color=color, unique=unique).sniff() - lines = entry.message.split('\n') - if len(lines) < 2: - continue - buf = '' - - if lines[1].strip().startswith('Protocol Enqueue: request') and request: - # request - print('➡️ ', end='') - fields = parse_fields(entry.message) - buf += f'{fields["Message"]}\n' - for name, value in fields.items(): - if name in ('Protocol Enqueue', 'Request', 'Message'): - continue - buf += f'{name}: {value}\n' - - elif lines[1].strip().startswith('Protocol Received: request') and response: - # response - print('⬅️ ', end='') - fields = parse_fields(entry.message) - buf += f'{fields["Response"]} ({fields["Protocol Received"]})\n' - for name, value in fields.items(): - if name in ('Protocol Received', 'Response'): - continue - buf += f'{name}: {value}\n' - - if buf: - if color: - print(highlight(buf, HttpLexer(), TerminalTrueColorFormatter(style='autumn'))) - else: - print(buf) - - -@cli.command('preference') -@click.option('--udid') +@cli.command('preference', cls=Command) @click.option('-o', '--out', type=click.File('w'), help='file to store the har entries into upon exit (ctrl+c)') -@click.option('pids', '-p', '--pid', multiple=True, help='filter pid list') +@click.option('pids', '-p', '--pid', type=click.INT, multiple=True, help='filter pid list') +@click.option('--color/--no-color', default=True) @click.option('process_names', '-pn', '--process-name', multiple=True, help='filter process name list') @click.option('images', '-i', '--image', multiple=True, help='filter image list') -@click.option('headers', '-h', '--header', multiple=True, help='filter header list') @click.option('--request/--no-request', is_flag=True, default=True, help='show requests') @click.option('--response/--no-response', is_flag=True, default=True, help='show responses') @click.option('-u', '--unique', is_flag=True, help='show only unique requests per image/pid/method/uri combination') -def cli_preference(udid, out, pids, process_names, images, headers, request, response, unique): +def cli_preference(lockdown: LockdownClient, out, pids, process_names, images, request, response, color, unique): """ Sniff using the secret com.apple.CFNetwork.plist configuration. This sniff includes the request/response body as well but requires the device to be jailbroken for the sniff to work """ - shown_set = set() - har = { - 'log': { - 'version': '0.1', - 'creator': { - 'name': 'remote-har-listener', - 'version': '0.1', - }, - 'entries': [], - } - } - - lockdown = LockdownClient(serial=udid) - os_trace_service = OsTraceService(lockdown) - incomplete = '' - - try: - for line in os_trace_service.syslog(): - if line.label is None: - continue - if line.label.category != 'HAR': - continue - - image = os.path.basename(line.image_name) - pid = line.pid - message = line.message - - if (len(pids) > 0) and (pid not in pids): - continue - - if (len(images) > 0) and (image not in images): - continue - - if process_names and (posixpath.basename(line.filename) not in process_names): - continue - - try: - entry = json.loads(incomplete + message) - incomplete = '' - except json.decoder.JSONDecodeError: - if message.startswith(''): - incomplete += message.split('', 1)[1] - continue - elif len(incomplete) > 0: - incomplete += message - continue - raise - - # artificial HAR information extracted from syslog line - entry['image'] = image - entry['pid'] = pid - entry['filename'] = line.filename - - if unique: - entry_hash = (image, pid, entry['request']['method'], entry['request']['url']) - - if entry_hash in shown_set: - continue - - shown_set.add(entry_hash) - show_har_entry(entry, filter_headers=headers, show_request=request, show_response=response) - - har['log']['entries'].append(entry) - except KeyboardInterrupt: - if out: - out.write(json.dumps(har, indent=4)) + filters = Filters(pids, process_names, images) + SnifferPreference(lockdown, filters=filters, request=request, response=response, out=out, color=color, + unique=unique).sniff() if __name__ == '__main__': diff --git a/harlogger/exceptions.py b/harlogger/exceptions.py new file mode 100644 index 0000000..a574af0 --- /dev/null +++ b/harlogger/exceptions.py @@ -0,0 +1,6 @@ +class HarloggerException(Exception): + pass + + +class HTTPParseError(HarloggerException): + pass diff --git a/harlogger/haralyzer_patches.py b/harlogger/haralyzer_patches.py new file mode 100644 index 0000000..28b734a --- /dev/null +++ b/harlogger/haralyzer_patches.py @@ -0,0 +1,19 @@ +from functools import cached_property +from typing import Optional + +from haralyzer.http import Response + + +class ResponseHook(Response): + @cached_property + def text(self) -> Optional[str]: + """ + :return: Response body + :rtype: str + """ + content = self.raw_entry["content"] + return content.get("_textBase64", content.get("text")) + + +def add_text_base64_support_for_haralyzer() -> None: + setattr(Response, 'text', ResponseHook.text) diff --git a/harlogger/http_transaction.py b/harlogger/http_transaction.py new file mode 100644 index 0000000..8b5771b --- /dev/null +++ b/harlogger/http_transaction.py @@ -0,0 +1,75 @@ +from abc import abstractmethod +from typing import List, Mapping, MutableMapping, Union + +from cached_property import cached_property + + +class HTTPTransaction: + def __init__(self, url: str, http_version: str, headers: Mapping = None, body: Union[str, bytes] = None): + self.url = url + self.http_version = http_version + self.headers = headers + self.body = body + + @staticmethod + def parse_transaction(message: str) -> 'HTTPTransaction': + res = None + parsed_transaction = HTTPTransaction._parse_fields(message=message) + + if 'Protocol Enqueue' in parsed_transaction: + method, url, http_version = parsed_transaction.pop('Protocol Enqueue').split()[1:] + parsed_transaction.pop('Message') + parsed_transaction.pop('Request') + res = HTTPRequest(url, method, http_version, parsed_transaction) + + elif 'Protocol Received' in parsed_transaction: + url = parsed_transaction.pop('Protocol Received').split()[2] + http_version, status, *status_text = parsed_transaction.pop('Response').split(' ', 2) + res = HTTPResponse(url, http_version, status, status_text, parsed_transaction) + return res + + @staticmethod + def _parse_fields(message: str) -> MutableMapping: + result = {} + for line in message.split('\n'): + if ': ' not in line: + continue + + line = line.strip() + k, v = line.split(':', 1) + k = k.strip() + v = v.strip() + result[k] = v + return result + + @abstractmethod + def _start_line(self) -> str: + pass + + @cached_property + def formatted(self) -> str: + formatted_headers = '' + for k, v in self.headers.items(): + formatted_headers += f'{k}: {v}\n' + return f'{self._start_line()}\n{formatted_headers}\n{self.body if self.body else ""}\n' + + +class HTTPRequest(HTTPTransaction): + def __init__(self, url: str, method: str, http_version: str, headers: Mapping = None, + body: Union[str, bytes] = None): + super().__init__(url, http_version, headers, body) + self.method = method + + def _start_line(self) -> str: + return f'{self.method} {self.url} {self.http_version}' + + +class HTTPResponse(HTTPTransaction): + def __init__(self, url: str, http_version: str, status: int, status_text: List, headers: Mapping = None, + body: Union[str, bytes] = None): + super().__init__(url, http_version, headers, body) + self.status = status + self.status_text = ' '.join(status_text) + + def _start_line(self) -> str: + return f'{self.http_version} {self.status} {self.status_text}' diff --git a/harlogger/sniffers.py b/harlogger/sniffers.py new file mode 100644 index 0000000..bdff281 --- /dev/null +++ b/harlogger/sniffers.py @@ -0,0 +1,184 @@ +import json +import os +import posixpath +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import IO, Tuple + +from haralyzer import HarEntry +from pygments import highlight +from pygments.formatters.terminal256 import TerminalTrueColorFormatter +from pygments.lexers.textfmts import HttpLexer +from pymobiledevice3.lockdown import LockdownClient +from pymobiledevice3.services.os_trace import OsTraceService + +from harlogger.exceptions import HTTPParseError +from harlogger.haralyzer_patches import add_text_base64_support_for_haralyzer +from harlogger.http_transaction import HTTPRequest, HTTPResponse, HTTPTransaction + +add_text_base64_support_for_haralyzer() + + +@dataclass +class EntryHash: + pid: int + process_name: str + image: str + url: str + + +@dataclass +class Filters: + pids: Tuple = None + process_names: Tuple = None + images: Tuple = None + + +class SnifferBase(ABC): + def __init__(self, lockdown: LockdownClient, filters: Filters = None, unique: bool = False, request: bool = True, + response: bool = True, color: bool = True, style: str = 'autumn'): + self._lockdown = lockdown + self._os_trace_service = OsTraceService(self._lockdown) + self._filters = filters + self._request = request + self._response = response + self._unique = unique + self._color = color + self._style = style + self._shown_list = [] + + def show(self, entry_hash: EntryHash, transaction: str, direction: str, extra: str = '') -> None: + if self._unique: + if entry_hash in self._shown_list: + return + else: + self._shown_list.append(entry_hash) + + print(f'{direction} {entry_hash.process_name} ({entry_hash.pid}) {extra}') + if self._color: + print(highlight(transaction, HttpLexer(), TerminalTrueColorFormatter(style=self._style))) + else: + print(transaction) + + def should_keep(self, entry_hash: EntryHash) -> bool: + if self._filters.pids and entry_hash.pid in self._filters.pids: + return False + + if self._filters.images and entry_hash.image in self._filters.images: + return False + + if self._filters.process_names and entry_hash.process_name in self._filters.process_names: + return False + return True + + @abstractmethod + def sniff(self) -> None: + pass + + +class SnifferPreference(SnifferBase): + """ + Sniff using the secret com.apple.CFNetwork.plist configuration. + + This sniff includes the request/response body as well but requires the device to be jailbroken for + the sniff to work + """ + + def __init__(self, lockdown: LockdownClient, filters: Filters = None, unique: bool = False, request: bool = True, + response: bool = True, color: bool = True, style: str = 'autumn', out: IO = None): + super().__init__(lockdown, filters, unique, request, response, color, style) + self.out = out + self.har = { + 'log': { + 'version': '0.1', + 'creator': { + 'name': 'remote-har-listener', + 'version': '0.1', + }, + 'entries': [], + } + } + + def sniff(self) -> None: + try: + self._sniff() + except KeyboardInterrupt: + if self.out: + self.out.write(json.dumps(self.har, indent=4)) + + def _sniff(self) -> None: + incomplete = '' + for line in self._os_trace_service.syslog(): + if line.label is None: + continue + if line.label.category != 'HAR': + continue + + message = line.message + + try: + entry = HarEntry(json.loads(incomplete + message)) + incomplete = '' + entry_hash = EntryHash(line.pid, + posixpath.basename(line.filename), + os.path.basename(line.image_name), + entry.url) + + if not self.should_keep(entry_hash): + continue + + self.har['log']['entries'].append(entry) + if self._request: + self.show(entry_hash, entry.request.formatted, '➡️') + if self._response: + self.show(entry_hash, entry.response.formatted, '⬅️') + + except json.decoder.JSONDecodeError: + if message.startswith(''): + incomplete += message.split('', 1)[1] + continue + elif len(incomplete) > 0: + incomplete += message + continue + + +class SnifferProfile(SnifferBase): + """ + Sniff using CFNetworkDiagnostics.mobileconfig profile. + + This requires the specific Apple profile to be installed for the sniff to work. + """ + + def __init__(self, lockdown: LockdownClient, filters: Filters = None, unique: bool = False, request: bool = True, + response: bool = True, color: bool = True, style: str = 'autumn'): + super().__init__(lockdown, filters, unique, request, response, color, style) + + def sniff(self): + for entry in self._os_trace_service.syslog(): + if entry.label is None or entry.label.subsystem != 'com.apple.CFNetwork' or \ + entry.label.category != 'Diagnostics': + continue + + if 'Protocol Received' not in entry.message and \ + 'Protocol Enqueue' not in entry.message: + continue + + lines = entry.message.split('\n') + if len(lines) < 2: + continue + + http_transaction = HTTPTransaction.parse_transaction(entry.message) + if not http_transaction: + raise HTTPParseError() + entry_hash = EntryHash(entry.pid, + posixpath.basename(entry.filename), + os.path.basename(entry.image_name), + http_transaction.url) + + if not self.should_keep(entry_hash): + continue + + if self._request and isinstance(http_transaction, HTTPRequest): + self.show(entry_hash, http_transaction.formatted, '➡️') + if self._response and isinstance(http_transaction, HTTPResponse): + self.show(entry_hash, http_transaction.formatted, '⬅️', f'({http_transaction.url})') diff --git a/pyproject.toml b/pyproject.toml index ab17de5..ac279e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,20 +3,22 @@ name = "harlogger" version = "2.1.3" description = "Simple utlity for sniffing decrypted HTTP/HTTPS traffic on an iOS device (either jailbroken or not)" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.8" license = { file = "LICENSE" } keywords = ["ios", "http", "https", "har", "sniffer", "jailbroken"] authors = [ - { name = "doronz88", email = "doron88@gmail.com" } + { name = "doronz88", email = "doron88@gmail.com" }, + { name = "netanelc305", email = "netanelc305@protonmail.com" } + ] maintainers = [ - { name = "doronz88", email = "doron88@gmail.com" } + { name = "doronz88", email = "doron88@gmail.com" }, + { name = "netanelc305", email = "netanelc305@protonmail.com" } ] classifiers = [ "Development Status :: 5 - Production/Stable", "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", diff --git a/requirements.txt b/requirements.txt index a0e1b1f..38a1fc8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ click -pymobiledevice3>=1.29.1 +pymobiledevice3>=2.0.2 pygments +haralyzer>=2.4.0 \ No newline at end of file