diff --git a/tests/test_models.py b/tests/test_models.py index 927c66703f..050a9e0560 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,15 @@ import pytest from unblob.file_utils import InvalidInputFormat -from unblob.models import Chunk, ProcessResult, Task, TaskResult, UnknownChunk, to_json +from unblob.models import ( + Chunk, + ProcessResult, + Task, + TaskResult, + UnknownChunk, + ValidChunk, + to_json, +) from unblob.report import ( ChunkReport, ExtractCommandFailedReport, @@ -153,56 +161,57 @@ def test_process_result_conversion(self): decoded_report = json.loads(json_text) assert decoded_report == [ { - "__typename__": "TaskResult", + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, "reports": [ { - "__typename__": "StatReport", + "path": "/nonexistent", + "size": 384, "is_dir": False, "is_file": True, "is_link": False, "link_target": None, - "path": "/nonexistent", - "size": 384, + "__typename__": "StatReport", }, { - "__typename__": "FileMagicReport", "magic": "Zip archive data, at least v2.0 to extract", "mime_type": "application/zip", + "__typename__": "FileMagicReport", }, { - "__typename__": "HashReport", "md5": "9019fcece2433ad7f12c077e84537a74", "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", }, { - "__typename__": "ChunkReport", - "end_offset": 384, - "extraction_reports": [], - "handler_name": "zip", "id": "test_basic_conversion:id", - "is_encrypted": False, - "size": 384, + "handler_name": "zip", "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", }, ], "subtasks": [ { - "__typename__": "Task", - "blob_id": "test_basic_conversion:id", + "path": "/extractions/nonexistent_extract", "depth": 314, + "blob_id": "test_basic_conversion:id", "is_multi_file": False, - "path": "/extractions/nonexistent_extract", + "__typename__": "Task", } ], - "task": { - "__typename__": "Task", - "blob_id": "", - "depth": 0, - "is_multi_file": False, - "path": "/nonexistent", - }, - }, + "__typename__": "TaskResult", + } ] def test_exotic_command_output(self): @@ -218,35 +227,44 @@ def test_exotic_command_output(self): decoded_report = json.loads(json_text) assert decoded_report == { - "__typename__": "ExtractCommandFailedReport", - "command": "dump all bytes", - "exit_code": 1, "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08" + "\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16" + "\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+," + "-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]" + "^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81" + "\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89" + "\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91" + "\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99" + "\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1" + "\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9" + "\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1" + "\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9" + "\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1" + "\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9" + "\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1" + "\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9" + "\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1" + "\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9" + "\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1" + "\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9" + "\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", "stderr": "stdout is pretty strange ;)", - "stdout": ( - "b'\\x00\\x01\\x02\\x03\\x04\\x05\\x06\\x07" - "\\x08\\t\\n\\x0b\\x0c\\r\\x0e\\x0f" - "\\x10\\x11\\x12\\x13\\x14\\x15\\x16\\x17" - '\\x18\\x19\\x1a\\x1b\\x1c\\x1d\\x1e\\x1f !"#' - "$%&\\'()*+,-./0123456789:;<=>?@AB" - "CDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]^_`a" - "bcdefghijklmnopqrstuvwxyz{|}~\\x7f" - "\\x80\\x81\\x82\\x83\\x84\\x85\\x86\\x87" - "\\x88\\x89\\x8a\\x8b\\x8c\\x8d\\x8e\\x8f" - "\\x90\\x91\\x92\\x93\\x94\\x95\\x96\\x97" - "\\x98\\x99\\x9a\\x9b\\x9c\\x9d\\x9e\\x9f" - "\\xa0\\xa1\\xa2\\xa3\\xa4\\xa5\\xa6\\xa7" - "\\xa8\\xa9\\xaa\\xab\\xac\\xad\\xae\\xaf" - "\\xb0\\xb1\\xb2\\xb3\\xb4\\xb5\\xb6\\xb7" - "\\xb8\\xb9\\xba\\xbb\\xbc\\xbd\\xbe\\xbf" - "\\xc0\\xc1\\xc2\\xc3\\xc4\\xc5\\xc6\\xc7" - "\\xc8\\xc9\\xca\\xcb\\xcc\\xcd\\xce\\xcf" - "\\xd0\\xd1\\xd2\\xd3\\xd4\\xd5\\xd6\\xd7" - "\\xd8\\xd9\\xda\\xdb\\xdc\\xdd\\xde\\xdf" - "\\xe0\\xe1\\xe2\\xe3\\xe4\\xe5\\xe6\\xe7" - "\\xe8\\xe9\\xea\\xeb\\xec\\xed\\xee\\xef" - "\\xf0\\xf1\\xf2\\xf3\\xf4\\xf5\\xf6\\xf7" - "\\xf8\\xf9\\xfa\\xfb\\xfc\\xfd\\xfe\\xff" - "'" - ), + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", } + + @pytest.mark.parametrize( + "metadata", + [ + pytest.param(1, id="metadata_int"), + pytest.param(0.2, id="metadata_float"), + pytest.param(True, id="metadata_bool"), + pytest.param([1, 2], id="metadata_list"), + pytest.param((1, 2), id="metadata_tuple"), + ], + ) + def test_invalid_metadata(self, metadata): + with pytest.raises(ValueError, match="Can only convert dict or Instance"): + ValidChunk(start_offset=0, end_offset=100, metadata=metadata) diff --git a/tests/test_report.py b/tests/test_report.py index 0a2ee0d39d..ce52a30d12 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -11,6 +11,7 @@ from unblob.processing import ExtractionConfig, process_file from unblob.report import ( ChunkReport, + ExtractCommandFailedReport, FileMagicReport, HashReport, StatReport, @@ -48,6 +49,178 @@ def test_process_file_report_output_is_valid_json( assert len(report) +class Test_ProcessResult_to_json: # noqa: N801 + def test_simple_conversion(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + chunk_id = "test_basic_conversion:id" + + task_result.add_report( + StatReport( + path=task.path, + size=384, + is_dir=False, + is_file=True, + is_link=False, + link_target=None, + ) + ) + task_result.add_report( + FileMagicReport( + magic="Zip archive data, at least v2.0 to extract", + mime_type="application/zip", + ) + ) + task_result.add_report( + HashReport( + md5="9019fcece2433ad7f12c077e84537a74", + sha1="36998218d8f43b69ef3adcadf2e8979e81eed166", + sha256="7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + ) + ) + task_result.add_report( + ChunkReport( + id=chunk_id, + handler_name="zip", + start_offset=0, + end_offset=384, + size=384, + is_encrypted=False, + metadata={}, + extraction_reports=[], + ) + ) + task_result.add_subtask( + Task( + path=Path("/extractions/nonexistent_extract"), + depth=314, + blob_id=chunk_id, + ) + ) + + json_text = ProcessResult(results=[task_result]).to_json() + + # output must be a valid json string + assert isinstance(json_text, str) + + # that can be loaded back + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "path": "/nonexistent", + "size": 384, + "is_dir": False, + "is_file": True, + "is_link": False, + "link_target": None, + "__typename__": "StatReport", + }, + { + "magic": "Zip archive data, at least v2.0 to extract", + "mime_type": "application/zip", + "__typename__": "FileMagicReport", + }, + { + "md5": "9019fcece2433ad7f12c077e84537a74", + "sha1": "36998218d8f43b69ef3adcadf2e8979e81eed166", + "sha256": "7d7ca7e1410b702b0f85d18257aebb964ac34f7fad0a0328d72e765bfcb21118", + "__typename__": "HashReport", + }, + { + "id": "test_basic_conversion:id", + "handler_name": "zip", + "start_offset": 0, + "end_offset": 384, + "size": 384, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [], + "__typename__": "ChunkReport", + }, + ], + "subtasks": [ + { + "path": "/extractions/nonexistent_extract", + "depth": 314, + "blob_id": "test_basic_conversion:id", + "is_multi_file": False, + "__typename__": "Task", + } + ], + "__typename__": "TaskResult", + } + ] + + def test_exotic_command_output(self): + task = Task(path=Path("/nonexistent"), depth=0, blob_id="") + task_result = TaskResult(task) + report = ExtractCommandFailedReport( + command="dump all bytes", + stdout=bytes(range(256)), + stderr=b"stdout is pretty strange ;)", + exit_code=1, + ) + + task_result.add_report( + ChunkReport( + id="test", + handler_name="fail", + start_offset=0, + end_offset=256, + size=256, + is_encrypted=False, + extraction_reports=[report], + ) + ) + json_text = ProcessResult(results=[task_result]).to_json() + + decoded_report = json.loads(json_text) + assert decoded_report == [ + { + "task": { + "path": "/nonexistent", + "depth": 0, + "blob_id": "", + "is_multi_file": False, + "__typename__": "Task", + }, + "reports": [ + { + "id": "test", + "handler_name": "fail", + "start_offset": 0, + "end_offset": 256, + "size": 256, + "is_encrypted": False, + "metadata": {}, + "extraction_reports": [ + { + "severity": "WARNING", + "command": "dump all bytes", + "stdout": "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\udc80\udc81\udc82\udc83\udc84\udc85\udc86\udc87\udc88\udc89\udc8a\udc8b\udc8c\udc8d\udc8e\udc8f\udc90\udc91\udc92\udc93\udc94\udc95\udc96\udc97\udc98\udc99\udc9a\udc9b\udc9c\udc9d\udc9e\udc9f\udca0\udca1\udca2\udca3\udca4\udca5\udca6\udca7\udca8\udca9\udcaa\udcab\udcac\udcad\udcae\udcaf\udcb0\udcb1\udcb2\udcb3\udcb4\udcb5\udcb6\udcb7\udcb8\udcb9\udcba\udcbb\udcbc\udcbd\udcbe\udcbf\udcc0\udcc1\udcc2\udcc3\udcc4\udcc5\udcc6\udcc7\udcc8\udcc9\udcca\udccb\udccc\udccd\udcce\udccf\udcd0\udcd1\udcd2\udcd3\udcd4\udcd5\udcd6\udcd7\udcd8\udcd9\udcda\udcdb\udcdc\udcdd\udcde\udcdf\udce0\udce1\udce2\udce3\udce4\udce5\udce6\udce7\udce8\udce9\udcea\udceb\udcec\udced\udcee\udcef\udcf0\udcf1\udcf2\udcf3\udcf4\udcf5\udcf6\udcf7\udcf8\udcf9\udcfa\udcfb\udcfc\udcfd\udcfe\udcff", + "stderr": "stdout is pretty strange ;)", + "exit_code": 1, + "__typename__": "ExtractCommandFailedReport", + } + ], + "__typename__": "ChunkReport", + } + ], + "subtasks": [], + "__typename__": "TaskResult", + } + ] + + @pytest.fixture def hello_kitty(tmp_path: Path) -> Path: """Generate an input file with 3 unknown chunks and 2 zip files.""" diff --git a/unblob/handlers/archive/sevenzip.py b/unblob/handlers/archive/sevenzip.py index e83c9461ad..c8656cd4e3 100644 --- a/unblob/handlers/archive/sevenzip.py +++ b/unblob/handlers/archive/sevenzip.py @@ -19,8 +19,9 @@ """ import binascii from pathlib import Path -from typing import Optional +from typing import Dict, Optional +from dissect.cstruct import Instance from structlog import get_logger from unblob.extractors import Command @@ -89,6 +90,9 @@ class SevenZipHandler(StructHandler): HEADER_STRUCT = HEADER_STRUCT EXTRACTOR = Command("7z", "x", "-p", "-y", "{inpath}", "-o{outdir}") + def get_metadata(self, header: Instance) -> Dict: + return {"version_maj": header.version_maj, "version_min": header.version_min} + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: header = self.parse_header(file) @@ -96,7 +100,11 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] size = calculate_sevenzip_size(header) - return ValidChunk(start_offset=start_offset, end_offset=start_offset + size) + metadata = self.get_metadata(header) + + return ValidChunk( + start_offset=start_offset, end_offset=start_offset + size, metadata=metadata + ) class MultiVolumeSevenZipHandler(DirectoryHandler): diff --git a/unblob/models.py b/unblob/models.py index 70217c8d75..31c4baaaeb 100644 --- a/unblob/models.py +++ b/unblob/models.py @@ -3,10 +3,11 @@ import json from enum import Enum from pathlib import Path -from typing import Iterable, List, Optional, Tuple, Type, TypeVar +from typing import Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union import attr import attrs +from dissect.cstruct import Instance from structlog import get_logger from .file_utils import Endian, File, InvalidInputFormat, StructParser @@ -29,6 +30,24 @@ # +def metadata_converter(obj: Union[Dict, Instance]) -> dict: + if isinstance(obj, dict): + return obj + if isinstance(obj, Instance): + result = {} + for k, v in obj._values.items(): # noqa: SLF001 + result[k] = v + return result + raise ValueError("Can only convert dict or Instance") + + +def metadata_validator(instance, attribute, value): + if attribute.name == "metadata" and isinstance(instance, Chunk): + for k, _ in value.items(): + if not isinstance(k, str): + raise TypeError("metadata keys must be string") + + @attr.define(frozen=True) class Task: path: Path @@ -102,6 +121,9 @@ class ValidChunk(Chunk): handler: "Handler" = attr.ib(init=False, eq=False) is_encrypted: bool = attr.ib(default=False) + metadata: dict = attr.ib( + factory=dict, converter=metadata_converter, validator=metadata_validator + ) def extract(self, inpath: Path, outdir: Path) -> Optional["ExtractResult"]: if self.is_encrypted: @@ -122,6 +144,7 @@ def as_report(self, extraction_reports: List[Report]) -> ChunkReport: size=self.size, handler_name=self.handler.NAME, is_encrypted=self.is_encrypted, + metadata=self.metadata, extraction_reports=extraction_reports, ) @@ -250,10 +273,7 @@ def default(self, obj): return str(obj) if isinstance(obj, bytes): - try: - return obj.decode() - except UnicodeDecodeError: - return str(obj) + return obj.decode("utf-8", errors="surrogateescape") logger.error("JSONEncoder met a non-JSON encodable value", obj=obj) # the usual fail path of custom JSONEncoders is to call the parent and let it fail diff --git a/unblob/report.py b/unblob/report.py index 1a4fc5aef9..7cb1eb4351 100644 --- a/unblob/report.py +++ b/unblob/report.py @@ -223,6 +223,7 @@ class ChunkReport(Report): end_offset: int size: int is_encrypted: bool + metadata: dict = attr.ib(factory=dict) extraction_reports: List[Report]