Skip to content

Commit

Permalink
feat(processing): pattern auto-identification
Browse files Browse the repository at this point in the history
Integrate pattern recognition for unknown chunks in order to help
identifying parts. Here we simply detect padding, but this could be
extended in the future to detect re-occuring patterns, encrypted
content, or even fingerprints.

Co-authored-by: Krisztián Fekete <[email protected]>
  • Loading branch information
qkaiser and e3krisztian committed Jan 3, 2024
1 parent e736358 commit a3f679b
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 6 deletions.
3 changes: 2 additions & 1 deletion tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ChunkReport,
FileMagicReport,
HashReport,
PaddingChunkReport,
StatReport,
UnknownChunkReport,
)
Expand Down Expand Up @@ -133,7 +134,7 @@ def hello_kitty_task_results(
size=7,
entropy=None,
),
UnknownChunkReport(
PaddingChunkReport(
id=ANY,
start_offset=263,
end_offset=264,
Expand Down
13 changes: 10 additions & 3 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import errno
import os
from pathlib import Path
from typing import Union

from structlog import get_logger

from .file_utils import carve, is_safe_path
from .models import Chunk, File, TaskResult, UnknownChunk, ValidChunk
from .models import Chunk, File, PaddingChunk, TaskResult, UnknownChunk, ValidChunk
from .report import MaliciousSymlinkRemoved

logger = get_logger()
Expand Down Expand Up @@ -113,8 +114,14 @@ def _fix_extracted_directory(directory: Path):
_fix_extracted_directory(outdir)


def carve_unknown_chunk(extract_dir: Path, file: File, chunk: UnknownChunk) -> Path:
filename = f"{chunk.start_offset}-{chunk.end_offset}.unknown"
def carve_unknown_chunk(
extract_dir: Path, file: File, chunk: Union[UnknownChunk, PaddingChunk]
) -> Path:
extension = "unknown"
if isinstance(chunk, PaddingChunk):
extension = "padding"

filename = f"{chunk.start_offset}-{chunk.end_offset}.{extension}"
carve_path = extract_dir / filename
logger.info("Extracting unknown chunk", path=carve_path, chunk=chunk)
carve_chunk_to_file(carve_path, file, chunk)
Expand Down
22 changes: 22 additions & 0 deletions unblob/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
EntropyReport,
ErrorReport,
MultiFileReport,
PaddingChunkReport,
Report,
UnknownChunkReport,
)
Expand Down Expand Up @@ -147,6 +148,27 @@ def as_report(self, entropy: Optional[EntropyReport]) -> UnknownChunkReport:
)


@attr.define(repr=False)
class PaddingChunk(Chunk):
r"""Gaps between valid chunks or otherwise unknown chunks.
Important for manual analysis, and analytical certanity: for example
entropy, other chunks inside it, metadata, etc.
These are not extracted, just logged for information purposes and further analysis,
like most common bytes (like \x00 and \xFF), ASCII strings, high entropy, etc.
"""

def as_report(self, entropy: Optional[EntropyReport]) -> PaddingChunkReport:
return PaddingChunkReport(
id=self.id,
start_offset=self.start_offset,
end_offset=self.end_offset,
size=self.size,
entropy=entropy,
)


@attrs.define
class MultiFile(Blob):
name: str = attr.field(kw_only=True)
Expand Down
29 changes: 27 additions & 2 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
from operator import attrgetter
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type
from typing import Iterable, List, Optional, Sequence, Set, Tuple, Type, Union

import attr
import magic
Expand All @@ -24,6 +24,7 @@
ExtractError,
File,
MultiFile,
PaddingChunk,
ProcessResult,
Task,
TaskResult,
Expand Down Expand Up @@ -450,6 +451,29 @@ def _iterate_directory(self, extract_dirs, processed_paths):
)


def is_padding(file: File, chunk: UnknownChunk):
return len(set(file[chunk.start_offset : chunk.end_offset])) == 1


def process_patterns(
unknown_chunks: List[UnknownChunk], file: File
) -> List[Union[UnknownChunk, PaddingChunk]]:
processed_chunks = []
for unknown_chunk in unknown_chunks:
if is_padding(file, unknown_chunk):
processed_chunks.append(
PaddingChunk(
start_offset=unknown_chunk.start_offset,
end_offset=unknown_chunk.end_offset,
id=unknown_chunk.id,
file=unknown_chunk.file,
)
)
else:
processed_chunks.append(unknown_chunk)
return processed_chunks


class _FileTask:
def __init__(
self,
Expand Down Expand Up @@ -487,6 +511,7 @@ def process(self):
)
outer_chunks = remove_inner_chunks(all_chunks)
unknown_chunks = calculate_unknown_chunks(outer_chunks, self.size)
unknown_chunks = process_patterns(unknown_chunks, file)
assign_file_to_chunks(outer_chunks, file=file)
assign_file_to_chunks(unknown_chunks, file=file)

Expand All @@ -503,7 +528,7 @@ def _process_chunks(
self,
file: File,
outer_chunks: List[ValidChunk],
unknown_chunks: List[UnknownChunk],
unknown_chunks: List[Union[UnknownChunk, PaddingChunk]],
):
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)
Expand Down
10 changes: 10 additions & 0 deletions unblob/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ class UnknownChunkReport(Report):
entropy: Optional[EntropyReport]


@final
@attr.define(kw_only=True, frozen=True)
class PaddingChunkReport(Report):
id: str # noqa: A003
start_offset: int
end_offset: int
size: int
entropy: Optional[EntropyReport]


@final
@attr.define(kw_only=True, frozen=True)
class MultiFileReport(Report):
Expand Down

0 comments on commit a3f679b

Please sign in to comment.