From a312492d25df759be28daf066895b0133e3d936a Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Fri, 14 Apr 2023 14:07:04 +0200 Subject: [PATCH] feat(reporting): report meta-data information about chunks. Allow handlers to provide a dict value as part of a ValidChunk metadata attribute. That dictionnary can contain any relevant metadata information from the perspective of the handler, but we advise handler writers to report parsed information such as header values. This metadata dict is later reported as part of our ChunkReports and available in the JSON report file if the user requested one. The idea is to expose metadata to further analysis steps through the unblob report. For example, a binary analysis toolkit would read the load address and architecture from a uImage chunk to analyze the file extracted from that chunk with the right settings. A note on the 'as_dict' implementation. The initial idea was to implement it in dissect.cstruct (see https://github.com/fox-it/dissect.cstruct/pull/29), but due to expected changes in the project's API I chose to implement it in unblob so we're not dependent on another project. --- tests/test_models.py | 124 +++++++++++--------- tests/test_report.py | 173 ++++++++++++++++++++++++++++ unblob/handlers/archive/sevenzip.py | 12 +- unblob/models.py | 30 ++++- unblob/report.py | 1 + 5 files changed, 280 insertions(+), 60 deletions(-) diff --git a/tests/test_models.py b/tests/test_models.py index 927c66703f..050a9e0560 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,15 @@ import pytest from unblob.file_utils import InvalidInputFormat -from unblob.models import Chunk, ProcessResult, Task, TaskResult, UnknownChunk, to_json +from unblob.models import ( + Chunk, + ProcessResult, + Task, + TaskResult, + UnknownChunk, + ValidChunk, + to_json, +) from unblob.report import ( ChunkReport, ExtractCommandFailedReport, @@ -153,56 +161,57 @@ def test_process_result_conversion(self): decoded_report = json.loads(json_text) assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, "reports": [ { - "__typename__": "StatReport", + "path": "/nonexistent", + "size": 384, "is_dir": False, "is_file": True, "is_link": False, "link_target": None, - "path": "/nonexistent", - "size": 384, + "__typename__": "StatReport", }, { - "__typename__": "FileMagicReport", "magic": "Zip archive data, at least v2.0 to extract", "mime_type": "application/zip", + "__typename__": "FileMagicReport", }, { - "__typename__": "HashReport", "md5": "9019fcece2433ad7f12c077e84537a74", "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", }, { - "__typename__": "ChunkReport", - "end_offset": 384, - "extraction_reports": [], - "handler_name": "zip", "id": "test_basic_conversion:id", - "is_encrypted": False, - "size": 384, + "handler_name": "zip", "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", }, ], "subtasks": [ { - "__typename__": "Task", - "blob_id": "test_basic_conversion:id", + "path": "/extractions/nonexistent_extract", "depth": 314, + "blob_id": "test_basic_conversion:id", "is_multi_file": False, - "path": "/extractions/nonexistent_extract", + "__typename__": "Task", } ], - "task": { - "__typename__": "Task", - "blob_id": "", - "depth": 0, - "is_multi_file": False, - "path": "/nonexistent", - }, - }, + "__typename__": "TaskResult", + } ] def test_exotic_command_output(self): @@ -218,35 +227,44 @@ def test_exotic_command_output(self): decoded_report = json.loads(json_text) assert decoded_report == { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16" + "\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+," + "-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]" + "^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81" + "\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89" + "\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91" + "\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99" + "\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1" + "\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9" + "\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1" + "\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9" + "\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1" + "\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9" + "\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1" + "\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9" + "\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1" + "\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9" + "\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1" + "\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9" + "\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } + + @pytest.mark.parametrize( + "metadata", + [ + pytest.param(1, id="metadata_int"), + pytest.param(0.2, id="metadata_float"), + pytest.param(True, id="metadata_bool"), + pytest.param([1, 2], id="metadata_list"), + pytest.param((1, 2), id="metadata_tuple"), + ], + ) + def test_invalid_metadata(self, metadata): + with pytest.raises(ValueError, match="Can only convert dict or Instance"): + ValidChunk(start_offset=0, end_offset=100, metadata=metadata) diff --git a/tests/test_report.py b/tests/test_report.py index 0a2ee0d39d..ce52a30d12 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -11,6 +11,7 @@ from unblob.processing import ExtractionConfig, process_file from unblob.report import ( ChunkReport, + ExtractCommandFailedReport, FileMagicReport, HashReport, StatReport, @@ -48,6 +49,178 @@ def test_process_file_report_output_is_valid_json( assert len(report) +class Test_ProcessResult_to_json: # noqa: N801 + def test_simple_conversion(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + chunk_id = "test_basic_conversion:id" + + task_result.add_report( + StatReport( + path=task.path, + size=384, + is_dir=False, + is_file=True, + is_link=False, + link_target=None, + ) + ) + task_result.add_report( + FileMagicReport( + magic="Zip archive data, at least v2.0 to extract", + mime_type="application/zip", + ) + ) + task_result.add_report( + HashReport( + md5="9019fcece2433ad7f12c077e84537a74", + sha1="36998218d8f43b69ef3adcadf2e8979e81eed166", + sha256="7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + ) + ) + task_result.add_report( + ChunkReport( + id=chunk_id, + handler_name="zip", + start_offset=0, + end_offset=384, + size=384, + is_encrypted=False, + metadata={}, + extraction_reports=[], + ) + ) + task_result.add_subtask( + Task( + path=Path("/extractions/nonexistent_extract"), + depth=314, + blob_id=chunk_id, + ) + ) + + json_text = ProcessResult(results=[task_result]).to_json() + + # output must be a valid json string + assert isinstance(json_text, str) + + # that can be loaded back + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "path": "/nonexistent", + "size": 384, + "is_dir": False, + "is_file": True, + "is_link": False, + "link_target": None, + "__typename__": "StatReport", + }, + { + "magic": "Zip archive data, at least v2.0 to extract", + "mime_type": "application/zip", + "__typename__": "FileMagicReport", + }, + { + "md5": "9019fcece2433ad7f12c077e84537a74", + "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", + "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", + }, + { + "id": "test_basic_conversion:id", + "handler_name": "zip", + "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", + }, + ], + "subtasks": [ + { + "path": "/extractions/nonexistent_extract", + "depth": 314, + "blob_id": "test_basic_conversion:id", + "is_multi_file": False, + "__typename__": "Task", + } + ], + "__typename__": "TaskResult", + } + ] + + def test_exotic_command_output(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + report = ExtractCommandFailedReport( + command="dump all bytes", + stdout=bytes(range(256)), + stderr=b"stdout is pretty strange ;)", + exit_code=1, + ) + + task_result.add_report( + ChunkReport( + id="test", + handler_name="fail", + start_offset=0, + end_offset=256, + size=256, + is_encrypted=False, + extraction_reports=[report], + ) + ) + json_text = ProcessResult(results=[task_result]).to_json() + + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "id": "test", + "handler_name": "fail", + "start_offset": 0, + "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [ + { + "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", + "stderr": "stdout is pretty strange ;)", + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", + } + ], + "__typename__": "ChunkReport", + } + ], + "subtasks": [], + "__typename__": "TaskResult", + } + ] + + @pytest.fixture def hello_kitty(tmp_path: Path) -> Path: """Generate an input file with 3 unknown chunks and 2 zip files.""" diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index e83c9461ad..c8656cd4e3 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -19,8 +19,9 @@ """ import binascii from pathlib import Path -from typing import Optional +from typing import Dict, Optional +from dissect.cstruct import Instance from structlog import get_logger from unblob.extractors import Command @@ -89,6 +90,9 @@ class SevenZipHandler(StructHandler): HEADER_STRUCT = HEADER_STRUCT EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") + def get_metadata(self, header: Instance) -> Dict: + return {"version_maj": header.version_maj, "version_min": header.version_min} + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: header = self.parse_header(file) @@ -96,7 +100,11 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] size = calculate_sevenzip_size(header) - return ValidChunk(start_offset=start_offset, end_offset=start_offset + size) + metadata = self.get_metadata(header) + + return ValidChunk( + start_offset=start_offset, end_offset=start_offset + size, metadata=metadata + ) class MultiVolumeSevenZipHandler(DirectoryHandler): diff --git a/unblob/models.py b/unblob/models.py index 70217c8d75..31c4baaaeb 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -3,10 +3,11 @@ import json from enum import Enum from pathlib import Path -from typing import Iterable, List, Optional, Tuple, Type, TypeVar +from typing import Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union import attr import attrs +from dissect.cstruct import Instance from structlog import get_logger from .file_utils import Endian, File, InvalidInputFormat, StructParser @@ -29,6 +30,24 @@ # +def metadata_converter(obj: Union[Dict, Instance]) -> dict: + if isinstance(obj, dict): + return obj + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + raise ValueError("Can only convert dict or Instance") + + +def metadata_validator(instance, attribute, value): + if attribute.name == "metadata" and isinstance(instance, Chunk): + for k, _ in value.items(): + if not isinstance(k, str): + raise TypeError("metadata keys must be string") + + @attr.define(frozen=True) class Task: path: Path @@ -102,6 +121,9 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib( + factory=dict, converter=metadata_converter, validator=metadata_validator + ) def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: if self.is_encrypted: @@ -122,6 +144,7 @@ def as_report(self, extraction_reports: List[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -250,10 +273,7 @@ def default(self, obj): return str(obj) if isinstance(obj, bytes): - try: - return obj.decode() - except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 1a4fc5aef9..7cb1eb4351 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -223,6 +223,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(factory=dict) extraction_reports: List[Report]