Skip to content

Commit

Permalink
Merge pull request #692 from onekey-sec/ui-no-extract-report
Browse files Browse the repository at this point in the history
Improve unblob "skip-extraction" mode of operation
  • Loading branch information
qkaiser authored Jan 3, 2024
2 parents bcbf49e + d1b6ccc commit 00b25fa
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 6 deletions.
34 changes: 34 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,37 @@ def test_skip_extension(
result = runner.invoke(unblob.cli.cli, params)
assert extracted_files_count == len(list(tmp_path.rglob("*")))
assert result.exit_code == 0


@pytest.mark.parametrize(
"args, skip_extraction, fail_message",
[
([], False, "Should *NOT* have skipped extraction"),
(["-s"], True, "Should have skipped extraction"),
(["--skip-extraction"], True, "Should have skipped extraction"),
],
)
def test_skip_extraction(
args: List[str], skip_extraction: bool, fail_message: str, tmp_path: Path
):
runner = CliRunner()
in_path = (
Path(__file__).parent
/ "integration"
/ "archive"
/ "zip"
/ "regular"
/ "__input__"
/ "apple.zip"
)
params = [*args, "--extract-dir", str(tmp_path), str(in_path)]

process_file_mock = mock.MagicMock()
with mock.patch.object(unblob.cli, "process_file", process_file_mock):
result = runner.invoke(unblob.cli.cli, params)

assert result.exit_code == 0
process_file_mock.assert_called_once()
assert (
process_file_mock.call_args.args[0].skip_extraction == skip_extraction
), fail_message
31 changes: 31 additions & 0 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,37 @@ def get_all(file_name, report_type: Type[ReportType]) -> List[ReportType]:
)


@pytest.mark.parametrize(
"skip_extraction, file_count, extracted_file_count",
[
(True, 5, 0),
(False, 5, 6),
],
)
def test_skip_extraction(
skip_extraction: bool,
file_count: int,
extracted_file_count: int,
tmp_path: Path,
extraction_config: ExtractionConfig,
):
input_file = tmp_path / "input"
with zipfile.ZipFile(input_file, "w") as zf:
for i in range(file_count):
zf.writestr(f"file{i}", data=b"This is a test file.")

extraction_config.extract_root = tmp_path / "output"
extraction_config.skip_extraction = skip_extraction

process_result = process_file(extraction_config, input_file)
task_result_by_path = {r.task.path: r for r in process_result.results}

assert len(task_result_by_path) == extracted_file_count + 1
assert (
len(list(extraction_config.extract_root.rglob("**/*"))) == extracted_file_count
)


class ConcatenateExtractor(DirectoryExtractor):
def extract(self, paths: List[Path], outdir: Path):
outfile = outdir / "data"
Expand Down
59 changes: 56 additions & 3 deletions unblob/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
import click
from rich.console import Console
from rich.panel import Panel
from rich.style import Style
from rich.table import Table
from structlog import get_logger

from unblob.models import DirectoryHandlers, Handlers, ProcessResult
from unblob.plugins import UnblobPluginManager
from unblob.report import ChunkReport, Severity, StatReport, UnknownChunkReport
from unblob.report import (
ChunkReport,
Severity,
StatReport,
UnknownChunkReport,
)

from .cli_options import verbosity_option
from .dependencies import get_dependencies, pretty_format_dependencies
Expand Down Expand Up @@ -200,7 +206,7 @@ def __init__(
)
@click.option(
"-s",
"--skip_extraction",
"--skip-extraction",
"skip_extraction",
is_flag=True,
show_default=True,
Expand Down Expand Up @@ -279,7 +285,10 @@ def cli(
logger.info("Start processing file", file=file)
process_results = process_file(config, file, report_file)
if verbose == 0:
print_report(process_results)
if skip_extraction:
print_scan_report(process_results)
else:
print_report(process_results)
return process_results


Expand Down Expand Up @@ -349,6 +358,50 @@ def get_size_report(task_results: List) -> Tuple[int, int, int, int]:
return total_files, total_dirs, total_links, extracted_size


def print_scan_report(reports: ProcessResult):
console = Console(stderr=True)

chunks_offset_table = Table(
expand=False,
show_lines=True,
show_edge=True,
style=Style(color="white"),
header_style=Style(color="white"),
row_styles=[Style(color="red")],
)
chunks_offset_table.add_column("Start offset")
chunks_offset_table.add_column("End offset")
chunks_offset_table.add_column("Size")
chunks_offset_table.add_column("Description")

for task_result in reports.results:
chunk_reports = [
report
for report in task_result.reports
if isinstance(report, (ChunkReport, UnknownChunkReport))
]
chunk_reports.sort(key=lambda x: x.start_offset)

for chunk_report in chunk_reports:
if isinstance(chunk_report, ChunkReport):
chunks_offset_table.add_row(
f"{chunk_report.start_offset:0d}",
f"{chunk_report.end_offset:0d}",
human_size(chunk_report.size),
chunk_report.handler_name,
style=Style(color="#00FFC8"),
)
if isinstance(chunk_report, UnknownChunkReport):
chunks_offset_table.add_row(
f"{chunk_report.start_offset:0d}",
f"{chunk_report.end_offset:0d}",
human_size(chunk_report.size),
"unknown",
style=Style(color="#008ED5"),
)
console.print(chunks_offset_table)


def print_report(reports: ProcessResult):
total_files, total_dirs, total_links, extracted_size = get_size_report(
reports.results
Expand Down
14 changes: 11 additions & 3 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ def process_file(

process_result = _process_task(config, task)

# ensure that the root extraction directory is created even for empty extractions
extract_dir.mkdir(parents=True, exist_ok=True)
if not config.skip_extraction:
# ensure that the root extraction directory is created even for empty extractions
extract_dir.mkdir(parents=True, exist_ok=True)

if report_file:
write_json_report(report_file, process_result)
Expand Down Expand Up @@ -475,7 +476,7 @@ def __init__(
def process(self):
logger.debug("Processing file", path=self.task.path, size=self.size)

if self.carve_dir.exists():
if self.carve_dir.exists() and not self.config.skip_extraction:
# Extraction directory is not supposed to exist, it is usually a simple mistake of running
# unblob again without cleaning up or using --force.
# It would cause problems continuing, as it would mix up original and extracted files,
Expand Down Expand Up @@ -515,6 +516,13 @@ def _process_chunks(
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)

if self.config.skip_extraction:
for chunk in unknown_chunks:
self.result.add_report(chunk.as_report(entropy=None))
for chunk in outer_chunks:
self.result.add_report(chunk.as_report(extraction_reports=[]))
return

for chunk in unknown_chunks:
carved_unknown_path = carve_unknown_chunk(self.carve_dir, file, chunk)
entropy = self._calculate_entropy(carved_unknown_path)
Expand Down

0 comments on commit 00b25fa

Please sign in to comment.