From d2347645c608bb24eb88afbcd06766c9a2aacbc9 Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Sun, 24 Dec 2023 16:27:58 +0100 Subject: [PATCH] feat(processing): add ability to skip files based on extension. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - there is a built in set of extensions to skip on - user specified extensions overwrite the built in set - empty extension is supported, which would result in skipping extraction of files without extension - giving an invalid extension (not empty, and does not start with .) will disable the builtin extension skip list Co-authored-by: KrisztiƔn Fekete <1246751+e3krisztian@users.noreply.github.com> --- tests/test_cli.py | 31 +++++++++++++++++++++++++++++++ unblob/cli.py | 12 ++++++++++++ unblob/processing.py | 9 ++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 7620247f66..5857a9e187 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -302,3 +302,34 @@ def test_keep_extracted_chunks( process_file_mock.call_args.args[0].keep_extracted_chunks == keep_extracted_chunks ), fail_message + + +@pytest.mark.parametrize( + "skip_extension, extracted_files_count", + [ + pytest.param([], 5, id="skip-extension-empty"), + pytest.param([""], 5, id="skip-zip-extension-empty-suffix"), + pytest.param([".zip"], 1, id="skip-extension-zip"), + pytest.param([".rlib"], 5, id="skip-extension-rlib"), + ], +) +def test_skip_extension( + skip_extension: List[str], extracted_files_count: int, tmp_path: Path +): + runner = CliRunner() + in_path = ( + Path(__file__).parent + / "integration" + / "archive" + / "zip" + / "regular" + / "__input__" + / "apple.zip" + ) + args = [] + for suffix in skip_extension: + args += ["--skip-extension", suffix] + params = [*args, "--extract-dir", str(tmp_path), str(in_path)] + result = runner.invoke(unblob.cli.cli, params) + assert extracted_files_count == len(list(tmp_path.rglob("*"))) + assert result.exit_code == 0 diff --git a/unblob/cli.py b/unblob/cli.py index aeffb1b616..b556a56a7c 100755 --- a/unblob/cli.py +++ b/unblob/cli.py @@ -22,6 +22,7 @@ from .processing import ( DEFAULT_DEPTH, DEFAULT_PROCESS_NUM, + DEFAULT_SKIP_EXTENSION, DEFAULT_SKIP_MAGIC, ExtractionConfig, process_file, @@ -166,6 +167,15 @@ def __init__( show_default=True, multiple=True, ) +@click.option( + "--skip-extension", + "skip_extension", + type=click.STRING, + default=DEFAULT_SKIP_EXTENSION, + help="Skip processing files with given extension", + show_default=True, + multiple=True, +) @click.option( "-p", "--process-num", @@ -229,6 +239,7 @@ def cli( depth: int, entropy_depth: int, skip_magic: Iterable[str], + skip_extension: Iterable[str], skip_extraction: bool, # noqa: FBT001 keep_extracted_chunks: bool, # noqa: FBT001 handlers: Handlers, @@ -254,6 +265,7 @@ def cli( entropy_plot=bool(verbose >= 3), skip_extraction=skip_extraction, skip_magic=skip_magic, + skip_extension=skip_extension, process_num=process_num, handlers=handlers, dir_handlers=dir_handlers, diff --git a/unblob/processing.py b/unblob/processing.py index 72b500a301..553b75c74d 100644 --- a/unblob/processing.py +++ b/unblob/processing.py @@ -77,6 +77,7 @@ "Windows Embedded CE binary image", "Intel serial flash for PCH ROM", ) +DEFAULT_SKIP_EXTENSION = (".rlib",) @attr.define(kw_only=True) @@ -87,6 +88,7 @@ class ExtractionConfig: entropy_plot: bool = False max_depth: int = DEFAULT_DEPTH skip_magic: Iterable[str] = DEFAULT_SKIP_MAGIC + skip_extension: Iterable[str] = DEFAULT_SKIP_EXTENSION skip_extraction: bool = False process_num: int = DEFAULT_PROCESS_NUM keep_extracted_chunks: bool = False @@ -292,9 +294,14 @@ def _process_task(self, result: TaskResult, task: Task): should_skip_file = any( magic.startswith(pattern) for pattern in self._config.skip_magic ) + should_skip_file |= task.path.suffix in self._config.skip_extension if should_skip_file: - log.debug("Ignoring file based on magic", magic=magic) + log.debug( + "Ignoring file based on magic or extension.", + magic=magic, + extension=task.path.suffix, + ) return _FileTask(self._config, task, stat_report.size, result).process()