From 6c320ca4d62ae9096777b60d4af06a8df46d3f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Fri, 25 Oct 2024 10:36:08 +0200 Subject: [PATCH] Binary search: show matching string (#1270) * db: add column to binary search cache for matched string data * feat: show matching strings in binary search * chore: replaced firmware_detail_tabular_field filter with macro * fix: fixed text overflow in search results * fix: fixed wording on DB browse/search results pagination * feat: made stored yara search strings count configurable * requested review changes #1270 * removed unused method _query_has_only_one_result from database routes * removed unused artifact from GraphQL search from base.html --- src/config.py | 4 + src/config/fact-core-config.toml | 4 + src/conftest.py | 1 + src/helperFunctions/yara_binary_search.py | 64 ++++++--- src/intercom/back_end_binding.py | 4 +- src/storage/db_interface_frontend.py | 7 +- src/storage/db_interface_frontend_editing.py | 9 +- ...dded_matching_strings_to_binary_search_.py | 27 ++++ src/storage/schema.py | 1 + src/test/data/fact-core-config.toml | 4 + .../data/fact-core-config.toml-missing-entrys | 4 + .../storage/test_db_interface_frontend.py | 12 +- .../test_db_interface_frontend_editing.py | 8 +- .../test_yara_binary_search.py | 45 +++++-- .../web_interface/test_app_binary_search.py | 2 +- .../web_interface/test_app_jinja_filter.py | 7 - .../components/compare_routes.py | 2 +- .../components/database_routes.py | 118 +++++++++++------ src/web_interface/components/jinja_filter.py | 5 - src/web_interface/pagination.py | 2 +- src/web_interface/rest/rest_binary_search.py | 9 +- .../static/js/show_analysis_preview.js | 25 ++-- src/web_interface/templates/base.html | 124 ++++++++++-------- .../templates/database/database_browse.html | 81 +++++++++++- .../firmware_detail_tabular_field.html | 12 -- src/web_interface/templates/home.html | 5 +- src/web_interface/templates/macros.html | 18 +++ .../templates/show_analysis.html | 6 +- 28 files changed, 432 insertions(+), 178 deletions(-) create mode 100644 src/storage/migration/versions/81a549a2be95_added_matching_strings_to_binary_search_.py delete mode 100644 src/web_interface/templates/generic_view/firmware_detail_tabular_field.html diff --git a/src/config.py b/src/config.py index 599bf6dda..86c712ca0 100644 --- a/src/config.py +++ b/src/config.py @@ -123,6 +123,9 @@ class Unpacking(BaseModel): delay: float base_port: int + class BinarySearch(BaseModel): + max_strings_per_match: int = 10 + class PluginDefaults(BaseModel): processes: int @@ -135,6 +138,7 @@ class Plugin(BaseModel): collector_worker_count: int = 2 unpacking: Backend.Unpacking + binary_search: Backend.BinarySearch firmware_file_storage_directory: str diff --git a/src/config/fact-core-config.toml b/src/config/fact-core-config.toml index 3f45d96fc..e51ac11c0 100644 --- a/src/config/fact-core-config.toml +++ b/src/config/fact-core-config.toml @@ -98,6 +98,10 @@ base-port = 9900 # if you experience FileNotFound errors during unpacking, increasing this value slightly might help delay = 0.0 +[backend.binary-search] +# maximum number of matching strings stored per match +max-strings-per-match = 10 + [[backend.plugin]] name = "cpu_architecture" diff --git a/src/conftest.py b/src/conftest.py index 1a49ac526..f727e0db0 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -117,6 +117,7 @@ def backend_config(request, common_config, _firmware_file_storage_directory) -> 'delay': 0.0, 'base_port': 9900, }, + 'binary_search': {'max_strings_per_match': 10}, 'plugin': { 'cpu_architecture': {'name': 'cpu_architecture', 'processes': 4}, 'cve_lookup': {'name': 'cve_lookup', 'processes': 2}, diff --git a/src/helperFunctions/yara_binary_search.py b/src/helperFunctions/yara_binary_search.py index b8919494a..bad06d8fd 100644 --- a/src/helperFunctions/yara_binary_search.py +++ b/src/helperFunctions/yara_binary_search.py @@ -1,7 +1,7 @@ from __future__ import annotations +import re import subprocess -from os.path import basename from pathlib import Path from subprocess import PIPE, STDOUT, CalledProcessError from tempfile import NamedTemporaryFile @@ -37,7 +37,8 @@ def _execute_yara_search(self, rule_file_path: str, target_path: str | None = No :return: The output from the yara scan. """ compiled_flag = '-C' if Path(rule_file_path).read_bytes().startswith(b'YARA') else '' - command = f'yara -r {compiled_flag} {rule_file_path} {target_path or self.db_path}' + # -r: recursive, -s: print strings, -N: no follow symlinks + command = f'yara -r -s -N {compiled_flag} {rule_file_path} {target_path or self.db_path}' yara_process = subprocess.run(command, shell=True, stdout=PIPE, stderr=STDOUT, text=True, check=False) return yara_process.stdout @@ -50,24 +51,55 @@ def _get_file_paths_of_files_included_in_fw(self, fw_uid: str) -> list[str]: return [self.fs_organizer.generate_path_from_uid(uid) for uid in self.db.get_all_files_in_fw(fw_uid)] @staticmethod - def _parse_raw_result(raw_result: str) -> dict[str, list[str]]: + def _parse_raw_result(raw_result: str) -> dict[str, dict[str, list[dict]]]: """ + YARA scan results have the following structure: + + :: + :: + ... + + ... + + We parse the results and put them into a dictionary of the following form: + { + : { + : [ + { + "offset": , + "condition": , + "match": , + }, + ... (max match_limit) + ] + }, + ... + } + :param raw_result: raw yara scan result - :return: dict of matching rules with lists of matched UIDs as values + :return: dict of matching files, rules and strings """ results = {} - for line in raw_result.split('\n'): - if line and 'warning' not in line: - rule, match = line.split(' ') - results.setdefault(rule, []).append(basename(match)) # noqa: PTH119 + for result_str in re.findall( + # + r'[a-zA-Z_][a-zA-Z0-9_]+ [^\n]+\n(?:0x[0-9a-f]+:\$[a-zA-Z0-9_]+: .+\n)+', + raw_result, + ): + rule_str, *match_lines = result_str.splitlines() + rule, path_str = rule_str.split(' ', maxsplit=1) + uid = Path(path_str).name + results.setdefault(uid, {}).setdefault(rule, []) + for match_line in match_lines: + offset, condition, match_str = match_line.split(':', maxsplit=2) + match_str = match_str[1:] # remove the space at the beginning + results[uid][rule].append({'offset': offset, 'condition': condition, 'match': match_str}) + if len(results[uid][rule]) >= config.backend.binary_search.max_strings_per_match: + # only collect at most matching strings to avoid storing loads of unnecessary data + # in case of very general rules with lots of matches + break return results - @staticmethod - def _eliminate_duplicates(result_dict: dict[str, list[str]]): - for key in result_dict: - result_dict[key] = sorted(set(result_dict[key])) - - def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str, list[str]] | str: + def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str, dict[str, list[dict]]] | str: """ Perform a yara search on the files in the database. @@ -80,9 +112,7 @@ def get_binary_search_result(self, task: tuple[bytes, str | None]) -> dict[str, try: self._prepare_temp_rule_file(temp_rule_file, yara_rules) raw_result = self._get_raw_result(firmware_uid, temp_rule_file) - results = self._parse_raw_result(raw_result) - self._eliminate_duplicates(results) - return results + return self._parse_raw_result(raw_result) except yara.SyntaxError as yara_error: return f'There seems to be an error in the rule file:\n{yara_error}' except CalledProcessError as process_error: diff --git a/src/intercom/back_end_binding.py b/src/intercom/back_end_binding.py index 621fa1447..655598371 100644 --- a/src/intercom/back_end_binding.py +++ b/src/intercom/back_end_binding.py @@ -200,8 +200,8 @@ class InterComBackEndBinarySearchTask(InterComListenerAndResponder): def get_response(self, task): yara_binary_searcher = YaraBinarySearchScanner() - uid_list = yara_binary_searcher.get_binary_search_result(task) - return uid_list, task + search_result = yara_binary_searcher.get_binary_search_result(task) + return search_result, task class InterComBackEndDeleteFile(InterComListenerAndResponder): diff --git a/src/storage/db_interface_frontend.py b/src/storage/db_interface_frontend.py index 893d587eb..98b0251d5 100644 --- a/src/storage/db_interface_frontend.py +++ b/src/storage/db_interface_frontend.py @@ -37,6 +37,7 @@ class MetaEntry(NamedTuple): class CachedQuery(NamedTuple): query: str yara_rule: str + match_data: dict[str, dict[str, list[dict]]] | None class FrontEndDbInterface(DbInterfaceCommon): @@ -369,7 +370,11 @@ def get_query_from_cache(self, query_id: str) -> CachedQuery | None: entry: SearchCacheEntry = session.get(SearchCacheEntry, query_id) if entry is None: return None - return CachedQuery(query=entry.query, yara_rule=entry.yara_rule) + return CachedQuery( + query=entry.query, + yara_rule=entry.yara_rule, + match_data=entry.match_data, + ) def get_total_cached_query_count(self): with self.get_read_only_session() as session: diff --git a/src/storage/db_interface_frontend_editing.py b/src/storage/db_interface_frontend_editing.py index b6f041d7c..d9517db59 100644 --- a/src/storage/db_interface_frontend_editing.py +++ b/src/storage/db_interface_frontend_editing.py @@ -17,12 +17,17 @@ def delete_comment(self, uid, timestamp): fo_entry: FileObjectEntry = session.get(FileObjectEntry, uid) fo_entry.comments = [comment for comment in fo_entry.comments if comment['time'] != timestamp] - def add_to_search_query_cache(self, search_query: str, query_title: str | None = None) -> str: + def add_to_search_query_cache(self, search_query: str, match_data: dict, query_title: str | None = None) -> str: query_uid = create_uid(query_title.encode()) with self.get_read_write_session() as session: old_entry = session.get(SearchCacheEntry, query_uid) if old_entry is not None: # update existing entry session.delete(old_entry) - new_entry = SearchCacheEntry(uid=query_uid, query=search_query, yara_rule=query_title) + new_entry = SearchCacheEntry( + uid=query_uid, + query=search_query, + yara_rule=query_title, + match_data=match_data, + ) session.add(new_entry) return query_uid diff --git a/src/storage/migration/versions/81a549a2be95_added_matching_strings_to_binary_search_.py b/src/storage/migration/versions/81a549a2be95_added_matching_strings_to_binary_search_.py new file mode 100644 index 000000000..e69fc8a4f --- /dev/null +++ b/src/storage/migration/versions/81a549a2be95_added_matching_strings_to_binary_search_.py @@ -0,0 +1,27 @@ +"""Added matching strings to binary search cache + +Revision ID: 81a549a2be95 +Revises: 05d8effce8b3 +Create Date: 2024-06-24 17:00:37.464098 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '81a549a2be95' +down_revision = '05d8effce8b3' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'search_cache', + sa.Column('match_data', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column('search_cache', 'match_data') diff --git a/src/storage/schema.py b/src/storage/schema.py index 220d661f8..06afc232c 100644 --- a/src/storage/schema.py +++ b/src/storage/schema.py @@ -171,6 +171,7 @@ class SearchCacheEntry(Base): uid = mapped_column(UID, primary_key=True) query = mapped_column(VARCHAR, nullable=False) # the query that searches for the files that the YARA rule matched yara_rule = mapped_column(VARCHAR, nullable=False) + match_data = mapped_column(MutableDict.as_mutable(JSONB), nullable=True) class WebInterfaceTemplateEntry(Base): diff --git a/src/test/data/fact-core-config.toml b/src/test/data/fact-core-config.toml index f48535888..5f7d9fed7 100644 --- a/src/test/data/fact-core-config.toml +++ b/src/test/data/fact-core-config.toml @@ -87,6 +87,10 @@ base-port = 9900 # if you experience FileNotFound errors during unpacking, increasing this value slightly might help delay = 0.0 +[backend.binary-search] +# maximum number of matching strings stored per match +max-strings-per-match = 10 + [[backend.plugin]] name = "cpu_architecture" diff --git a/src/test/data/fact-core-config.toml-missing-entrys b/src/test/data/fact-core-config.toml-missing-entrys index ff2c0ae0f..52714c894 100644 --- a/src/test/data/fact-core-config.toml-missing-entrys +++ b/src/test/data/fact-core-config.toml-missing-entrys @@ -87,6 +87,10 @@ base-port = 9900 # if you experience FileNotFound errors during unpacking, increasing this value slightly might help delay = 0.0 +[backend.binary-search] +# maximum number of matching strings stored per match +max-strings-per-match = 10 + [[backend.plugin]] name = "cpu_architecture" diff --git a/src/test/integration/storage/test_db_interface_frontend.py b/src/test/integration/storage/test_db_interface_frontend.py index fb3ea143e..a46307c5c 100644 --- a/src/test/integration/storage/test_db_interface_frontend.py +++ b/src/test/integration/storage/test_db_interface_frontend.py @@ -568,28 +568,30 @@ def test_get_tag_list(frontend_db, backend_db): def test_get_query_from_cache(frontend_db, frontend_editing_db): assert frontend_db.get_query_from_cache('non-existent') is None - id_ = frontend_editing_db.add_to_search_query_cache('foo', 'bar') + match_data = {'uid': {'rule': []}} + id_ = frontend_editing_db.add_to_search_query_cache('foo', match_data, 'bar') entry = frontend_db.get_query_from_cache(id_) assert isinstance(entry, CachedQuery) assert entry.query == 'foo' assert entry.yara_rule == 'bar' + assert entry.match_data == match_data def test_get_cached_count(frontend_db, frontend_editing_db): assert frontend_db.get_total_cached_query_count() == 0 - frontend_editing_db.add_to_search_query_cache('foo', 'bar') + frontend_editing_db.add_to_search_query_cache('foo', {}, 'bar') assert frontend_db.get_total_cached_query_count() == 1 - frontend_editing_db.add_to_search_query_cache('bar', 'foo') + frontend_editing_db.add_to_search_query_cache('bar', {}, 'foo') assert frontend_db.get_total_cached_query_count() == 2 # noqa: PLR2004 def test_search_query_cache(frontend_db, frontend_editing_db): assert frontend_db.search_query_cache(offset=0, limit=10) == [] - id1 = frontend_editing_db.add_to_search_query_cache('foo', 'rule bar{}') - id2 = frontend_editing_db.add_to_search_query_cache('bar', 'rule foo{}') + id1 = frontend_editing_db.add_to_search_query_cache('foo', {}, 'rule bar{}') + id2 = frontend_editing_db.add_to_search_query_cache('bar', {}, 'rule foo{}') assert sorted(frontend_db.search_query_cache(offset=0, limit=10)) == [ (id1, 'rule bar{}', ['bar']), (id2, 'rule foo{}', ['foo']), diff --git a/src/test/integration/storage/test_db_interface_frontend_editing.py b/src/test/integration/storage/test_db_interface_frontend_editing.py index a18c4ddb0..1a88f9da7 100644 --- a/src/test/integration/storage/test_db_interface_frontend_editing.py +++ b/src/test/integration/storage/test_db_interface_frontend_editing.py @@ -35,18 +35,20 @@ def test_search_cache_insert(frontend_editing_db, frontend_db): result = frontend_db.get_query_from_cache(RULE_UID) assert result is None - result = frontend_editing_db.add_to_search_query_cache('{"foo": "bar"}', 'rule foo{}') + match_data = {'some_uid': {'foo': []}} + result = frontend_editing_db.add_to_search_query_cache('{"foo": "bar"}', match_data, 'rule foo{}') assert result == RULE_UID result = frontend_db.get_query_from_cache(RULE_UID) assert isinstance(result, CachedQuery) assert result.query == '{"foo": "bar"}' assert result.yara_rule == 'rule foo{}' + assert result.match_data == match_data def test_search_cache_update(frontend_editing_db, frontend_db): - assert frontend_editing_db.add_to_search_query_cache('{"uid": "some uid"}', 'rule foo{}') == RULE_UID + assert frontend_editing_db.add_to_search_query_cache('{"uid": "some uid"}', {}, 'rule foo{}') == RULE_UID # update - assert frontend_editing_db.add_to_search_query_cache('{"uid": "some other uid"}', 'rule foo{}') == RULE_UID + assert frontend_editing_db.add_to_search_query_cache('{"uid": "some other uid"}', {}, 'rule foo{}') == RULE_UID assert frontend_db.get_query_from_cache(RULE_UID).query == '{"uid": "some other uid"}' diff --git a/src/test/unit/helperFunctions/test_yara_binary_search.py b/src/test/unit/helperFunctions/test_yara_binary_search.py index 52d29061e..3f536abc0 100644 --- a/src/test/unit/helperFunctions/test_yara_binary_search.py +++ b/src/test/unit/helperFunctions/test_yara_binary_search.py @@ -12,6 +12,7 @@ TEST_FILE_1 = 'binary_search_test' TEST_FILE_2 = 'binary_search_test_2' TEST_FILE_3 = 'binary_search_test_3' +MATCH_DATA_KEYS = {'condition', 'match', 'offset'} class MockCommonDbInterface: @@ -37,12 +38,20 @@ def setUp(self): def test_get_binary_search_result(self): result = self.yara_binary_scanner.get_binary_search_result((self.yara_rule, None)) - assert result == {'test_rule': [TEST_FILE_1]} + assert TEST_FILE_1 in result + assert 'test_rule' in result[TEST_FILE_1] + match_data = result[TEST_FILE_1]['test_rule'] + assert len(match_data) == 1 + assert all(k in m for k in MATCH_DATA_KEYS for m in match_data) def test_get_binary_search_result_for_single_firmware(self): yara_rule = b'rule test_rule_2 {strings: $a = "TEST_STRING!" condition: $a}' result = self.yara_binary_scanner.get_binary_search_result((yara_rule, 'single_firmware')) - assert result == {'test_rule_2': [TEST_FILE_2]} + assert TEST_FILE_2 in result + assert 'test_rule_2' in result[TEST_FILE_2] + match_data = result[TEST_FILE_2]['test_rule_2'] + assert len(match_data) == 1 + assert all(k in m for k in MATCH_DATA_KEYS for m in match_data) result = self.yara_binary_scanner.get_binary_search_result((yara_rule, 'foobar')) assert result == {} @@ -58,15 +67,33 @@ def test_get_binary_search_yara_error(self, _): # noqa: PT019 assert isinstance(result, str) assert 'Error when calling YARA' in result - def test_eliminate_duplicates(self): - test_dict = {1: [1, 2, 3, 3], 2: [1, 1, 2, 3]} - self.yara_binary_scanner._eliminate_duplicates(test_dict) - assert test_dict == {1: [1, 2, 3], 2: [1, 2, 3]} - def test_parse_raw_result(self): - raw_result = 'rule_1 match_1\nrule_1 match_2\nrule_2 match_1' + raw_result = ( + 'rule_1 /media/data/fact_fw_data/00/uid1\n' + '0x123:$a: foo\n' + '0x456:$a: bar\n' + 'rule_1 /media/data/fact_fw_data/99/uid2\n' + '0x321:$b: test123\n' + 'rule_2 /media/data/fact_fw_data/00/uid1\n' + '0x666:$c: deadbeef\n' + ) result = self.yara_binary_scanner._parse_raw_result(raw_result) - assert result == {'rule_1': ['match_1', 'match_2'], 'rule_2': ['match_1']} + assert result == { + 'uid1': { + 'rule_1': [ + {'condition': '$a', 'match': 'foo', 'offset': '0x123'}, + {'condition': '$a', 'match': 'bar', 'offset': '0x456'}, + ], + 'rule_2': [ + {'condition': '$c', 'match': 'deadbeef', 'offset': '0x666'}, + ], + }, + 'uid2': { + 'rule_1': [ + {'condition': '$b', 'match': 'test123', 'offset': '0x321'}, + ], + }, + } def test_execute_yara_search(self): test_rule_path = path.join(get_test_data_dir(), 'yara_binary_search_test_rule') # noqa: PTH118 diff --git a/src/test/unit/web_interface/test_app_binary_search.py b/src/test/unit/web_interface/test_app_binary_search.py index 3d5a6d2db..ddc647f6e 100644 --- a/src/test/unit/web_interface/test_app_binary_search.py +++ b/src/test/unit/web_interface/test_app_binary_search.py @@ -22,7 +22,7 @@ def add_to_search_query_cache(*_, **__): @staticmethod def get_query_from_cache(query_id): if query_id == QUERY_CACHE_UID: - return CachedQuery(query='{"uid": {"$in": ["test_uid"]}}', yara_rule='some yara rule') + return CachedQuery(query='{"uid": {"$in": ["test_uid"]}}', yara_rule='some yara rule', match_data={}) return None diff --git a/src/test/unit/web_interface/test_app_jinja_filter.py b/src/test/unit/web_interface/test_app_jinja_filter.py index ddb5eed10..e668b1043 100644 --- a/src/test/unit/web_interface/test_app_jinja_filter.py +++ b/src/test/unit/web_interface/test_app_jinja_filter.py @@ -1,7 +1,6 @@ import pytest from flask import render_template_string -from storage.db_interface_frontend import MetaEntry from web_interface.components.jinja_filter import FilterClass @@ -19,12 +18,6 @@ def test_filter_replace_uid_with_file_name(self, web_frontend, filter_class): result = _get_template_filter_output(web_frontend, test_string, 'replace_uid_with_file_name') assert '>test_name<' in result - def test_filter_firmware_detail_tabular_field(self, web_frontend, filter_class): - test_firmware_meta_data = MetaEntry('UID', 'HID', {'tag1': 'danger', 'tag2': 'default'}, 0) - result = _get_template_filter_output(web_frontend, test_firmware_meta_data, 'firmware_detail_tabular_field') - for expected_part in ['/analysis/UID', 'HID', 'tag1<', 'tag2<']: - assert expected_part in result - def test_filter_replace_uid_with_hid(self, filter_class): one_uid = f'{"a" * 64}_1234' assert filter_class._filter_replace_uid_with_hid(f'{one_uid}_{one_uid}') == 'TEST_FW_HID_TEST_FW_HID' diff --git a/src/web_interface/components/compare_routes.py b/src/web_interface/components/compare_routes.py index a54439265..7231c2f3f 100644 --- a/src/web_interface/components/compare_routes.py +++ b/src/web_interface/components/compare_routes.py @@ -114,7 +114,7 @@ def browse_comparisons(self): total = comparison_db.get_total_number_of_results() - pagination = get_pagination(page=page, per_page=per_page, total=total, record_name='compare results') + pagination = get_pagination(page=page, per_page=per_page, total=total) return render_template( 'database/compare_browse.html', compare_list=compare_list, diff --git a/src/web_interface/components/database_routes.py b/src/web_interface/components/database_routes.py index da1f1e873..2755afb52 100644 --- a/src/web_interface/components/database_routes.py +++ b/src/web_interface/components/database_routes.py @@ -1,7 +1,10 @@ +from __future__ import annotations + import json import logging +from dataclasses import dataclass from datetime import datetime -from itertools import chain +from enum import Enum from flask import redirect, render_template, request, url_for from sqlalchemy.exc import SQLAlchemyError @@ -19,6 +22,22 @@ from web_interface.security.privileges import PRIVILEGES +@dataclass +class SearchParameters: + class TargetType(str, Enum): + yara = 'YARA' + file = 'File' + firmware = 'Firmware' + inverted = 'Inverse Firmware' + + query: dict | str + only_firmware: bool + inverted: bool + search_target: TargetType + query_title: str + yara_match_data: dict[str, dict[str, list[dict]]] | None + + class DatabaseRoutes(ComponentBase): @staticmethod def _add_date_to_query(query, date): @@ -35,18 +54,19 @@ def _add_date_to_query(query, date): @roles_accepted(*PRIVILEGES['basic_search']) @AppRoute('/database/browse', GET) - def browse_database(self, query: str = '{}', only_firmwares=False, inverted=False): + def browse_database(self, query: str = '{}'): page, per_page = extract_pagination_from_request(request)[0:2] - search_parameters = self._get_search_parameters(query, only_firmwares, inverted) + offset, limit = per_page * (page - 1), per_page + parameters = self._get_search_parameters(query) with get_shared_session(self.db.frontend) as frontend_db: try: firmware_list = self._search_database( - search_parameters['query'], - skip=per_page * (page - 1), - limit=per_page, - only_firmwares=search_parameters['only_firmware'], - inverted=search_parameters['inverted'], + parameters.query, + skip=offset, + limit=limit, + only_firmwares=parameters.only_firmware, + inverted=parameters.inverted, ) except QueryConversionException as exception: error_message = exception.get_message() @@ -57,12 +77,12 @@ def browse_database(self, query: str = '{}', only_firmwares=False, inverted=Fals return render_template('error.html', message=error_message) total = frontend_db.get_number_of_total_matches( - search_parameters['query'], search_parameters['only_firmware'], inverted=search_parameters['inverted'] + parameters.query, parameters.only_firmware, parameters.inverted ) device_classes = frontend_db.get_device_class_list() vendors = frontend_db.get_vendor_list() - pagination = get_pagination(page=page, per_page=per_page, total=total, record_name='firmwares') + pagination = get_pagination(page=page, per_page=per_page, total=total) return render_template( 'database/database_browse.html', firmware_list=firmware_list, @@ -73,7 +93,7 @@ def browse_database(self, query: str = '{}', only_firmwares=False, inverted=Fals vendors=vendors, current_class=str(request.args.get('device_class')), current_vendor=str(request.args.get('vendor')), - search_parameters=search_parameters, + search_parameters=parameters, ) @roles_accepted(*PRIVILEGES['pattern_search']) @@ -98,31 +118,43 @@ def browse_searches(self): pagination=pagination, ) - def _get_search_parameters(self, query, only_firmware, inverted): + def _get_search_parameters(self, query_str: str) -> SearchParameters: """ This function prepares the requested search by parsing all necessary parameters. In case of a binary search, indicated by the query being an uid instead of a dict, the cached search result is retrieved. """ - search_parameters = {} - if request.args.get('query'): - query = request.args.get('query') - if is_uid(query): - cached_query = self.db.frontend.get_query_from_cache(query) - query = cached_query.query - search_parameters['query_title'] = cached_query.yara_rule - search_parameters['only_firmware'] = ( - request.args.get('only_firmwares') == 'True' if request.args.get('only_firmwares') else only_firmware + query_str = request.args.get('query', query_str) + only_firmware = request.args.get('only_firmwares') == 'True' + inverted = request.args.get('inverted') == 'True' + query_title = None + yara_match_data = None + if is_uid(query_str): # cached binary search + cached_query = self.db.frontend.get_query_from_cache(query_str) + query = json.loads(cached_query.query) + query_title = cached_query.yara_rule + yara_match_data = cached_query.match_data + else: # regular / advanced search + query = apply_filters_to_query(request, query_str) + if request.args.get('date'): + query = self._add_date_to_query(query, request.args.get('date')) + search_target = ( + SearchParameters.TargetType.yara + if query_title + else SearchParameters.TargetType.file + if not only_firmware + else SearchParameters.TargetType.firmware + if not inverted + else SearchParameters.TargetType.inverted ) - search_parameters['inverted'] = ( - request.args.get('inverted') == 'True' if request.args.get('inverted') else inverted + return SearchParameters( + query=query, + inverted=inverted, + only_firmware=only_firmware, + search_target=search_target, + query_title=query_title or query, + yara_match_data=yara_match_data, ) - search_parameters['query'] = apply_filters_to_query(request, query) - if 'query_title' not in search_parameters: - search_parameters['query_title'] = search_parameters['query'] - if request.args.get('date'): - search_parameters['query'] = self._add_date_to_query(search_parameters['query'], request.args.get('date')) - return search_parameters def _search_database(self, query, skip=0, limit=0, only_firmwares=False, inverted=False): meta_list = self.db.frontend.generic_search( @@ -148,7 +180,8 @@ def _build_search_query(self): query['firmware_tags'] = {'$overlap': tags} return json.dumps(query) - def _add_hash_query_to_query(self, query, value): + @staticmethod + def _add_hash_query_to_query(query, value): # FIXME: The frontend should not need to know how the plugin is configured hash_types = ['md5', 'sha1', 'sha256', 'sha512', 'ripemd160', 'whirlpool'] hash_query = {f'processed_analysis.file_hashes.{hash_type}': value for hash_type in hash_types} @@ -210,7 +243,8 @@ def start_binary_search(self): error = 'please select a file or enter rules in the text area' return render_template('database/database_binary_search.html', error=error) - def _get_items_from_binary_search_request(self, req): + @staticmethod + def _get_items_from_binary_search_request(req): yara_rule_file = None if req.files.get('file'): _, yara_rule_file = get_file_name_and_binary_from_request(req) @@ -234,10 +268,11 @@ def get_binary_search_results(self): error = result elif result is not None: yara_rules = make_unicode_string(yara_rules[0]) - joined_results = self._join_results(result) - query_uid = self._store_binary_search_query(joined_results, yara_rules) + query_uid = self._store_binary_search_query(result, yara_rules) return redirect( - url_for('browse_database', query=query_uid, only_firmwares=request.args.get('only_firmware')) + url_for( + self.browse_database.__name__, query=query_uid, only_firmwares=request.args.get('only_firmware') + ) ) else: error = 'No request ID found' @@ -250,13 +285,14 @@ def get_binary_search_results(self): yara_rules=yara_rules, ) - def _store_binary_search_query(self, binary_search_results: list, yara_rules: str) -> str: - query = '{"_id": {"$in": ' + str(binary_search_results).replace("'", '"') + '}}' - return self.db.editing.add_to_search_query_cache(query, query_title=yara_rules) - - @staticmethod - def _join_results(result_dict): - return list(set(chain(*result_dict.values()))) + def _store_binary_search_query(self, binary_search_results: dict, yara_rules: str) -> str: + matching_uids = sorted(binary_search_results) + query = '{"_id": {"$in": ' + str(matching_uids).replace("'", '"') + '}}' + return self.db.editing.add_to_search_query_cache( + query, + match_data=binary_search_results, + query_title=yara_rules, + ) @roles_accepted(*PRIVILEGES['basic_search']) @AppRoute('/database/quick_search', GET) diff --git a/src/web_interface/components/jinja_filter.py b/src/web_interface/components/jinja_filter.py index 71bd04bcc..622d4afb2 100644 --- a/src/web_interface/components/jinja_filter.py +++ b/src/web_interface/components/jinja_filter.py @@ -116,10 +116,6 @@ def _virtual_path_element_to_span(hid_element: str, uid: str, root_uid: str, cur '' ) - @staticmethod - def _render_firmware_detail_tabular_field(firmware_meta_data): - return render_template('generic_view/firmware_detail_tabular_field.html', firmware=firmware_meta_data) - @staticmethod def _render_general_information_table( firmware: MetaEntry, root_uid: str, other_versions, selected_analysis, file_tree_paths @@ -188,7 +184,6 @@ def _setup_filters(self): # noqa: PLR0915 ] = flt.data_to_chart_with_value_percentage_pairs self._app.jinja_env.filters['decompress'] = flt.decompress self._app.jinja_env.filters['dict_to_json'] = json.dumps - self._app.jinja_env.filters['firmware_detail_tabular_field'] = self._render_firmware_detail_tabular_field self._app.jinja_env.filters['fix_cwe'] = flt.fix_cwe self._app.jinja_env.filters['format_duration'] = flt.format_duration self._app.jinja_env.filters['format_string_list_with_offset'] = flt.filter_format_string_list_with_offset diff --git a/src/web_interface/pagination.py b/src/web_interface/pagination.py index 95fa4438c..adf5c8d86 100644 --- a/src/web_interface/pagination.py +++ b/src/web_interface/pagination.py @@ -4,7 +4,7 @@ def get_pagination(**kwargs): - kwargs.setdefault('record_name', 'records') + kwargs.setdefault('record_name', 'results') return Pagination( css_framework='bootstrap4', link_size='sm', diff --git a/src/web_interface/rest/rest_binary_search.py b/src/web_interface/rest/rest_binary_search.py index 8c83986f2..1808f8ac3 100644 --- a/src/web_interface/rest/rest_binary_search.py +++ b/src/web_interface/rest/rest_binary_search.py @@ -74,4 +74,11 @@ def get(self, search_id=None): if result is None: return error_message('The result is not ready yet or it has already been fetched', self.URL) - return success_message({'binary_search_results': result}, self.URL) + # the "new" binary search result has the structure {: {: []}} + # we convert it back to the "old" structure {: []} in order to maintain compatibility + transposed_result = {} + for uid, uid_result in result.items(): + for rule in uid_result: + transposed_result.setdefault(rule, []).append(uid) + + return success_message({'binary_search_results': transposed_result}, self.URL) diff --git a/src/web_interface/static/js/show_analysis_preview.js b/src/web_interface/static/js/show_analysis_preview.js index 92fda352f..261c21e1c 100644 --- a/src/web_interface/static/js/show_analysis_preview.js +++ b/src/web_interface/static/js/show_analysis_preview.js @@ -1,4 +1,7 @@ const preview_loading_gif = document.getElementById("preview-loading-gif"); +const preview_button = document.getElementById("preview_button"); +const offset_input = document.getElementById("hex-preview-offset"); + function hide_gif(element) { element.style.display = "none"; } @@ -7,36 +10,40 @@ function init_preview() { if (isTextOrImage) { highlight_code(); } + preview_button.scrollIntoView(); + offset_input.focus(); } function highlight_code() { const block = $('div#preview-div pre')[0]; hljs.highlightElement(block); line_numbering(); } -function load_preview() { +function load_preview(offset = null, focus = false) { let resourcePath; document.getElementById("preview_button").onclick = () => false; + if (focus && offset !== null) { + document.getElementById("preview-div").classList.add("show"); + offset_input.value = offset; + } if (isTextOrImage) { resourcePath = `/ajax_get_binary/${mimeType}/${uid}`; } else { // hex preview - if ($("#hex-preview-offset").hasClass("is-invalid")) { - $("#hex-preview-offset").removeClass("is-invalid"); - } + offset_input.classList.remove("is-invalid"); $("#preview-content").html(""); - document.getElementById('hex-preview-form').style.display = "flex"; - let offset = parseInt(document.getElementById('hex-preview-offset').value); + document.getElementById("hex-preview-form").style.display = "flex"; + let offset = parseInt(offset_input.value); if (isNaN(offset)) { - $("#hex-preview-offset").addClass("is-invalid"); + offset_input.classList.add("is-invalid"); return; } - let length = document.getElementById('hex-preview-length').value; + let length = document.getElementById("hex-preview-length").value; resourcePath = `/ajax_get_hex_preview/${uid}/${offset}/${length}`; } preview_loading_gif.style.display = "block"; $("#preview-content").load(resourcePath, init_preview); } -document.getElementById("preview_button").onclick = load_preview; +preview_button.onclick = load_preview; let rawResultIsHighlighted = false; const toggleSwitch = document.getElementById("rawResultSwitch"); diff --git a/src/web_interface/templates/base.html b/src/web_interface/templates/base.html index 7b898f662..376ea85c3 100644 --- a/src/web_interface/templates/base.html +++ b/src/web_interface/templates/base.html @@ -49,62 +49,76 @@ {% endblock %} {% block styles %}{% endblock %} diff --git a/src/web_interface/templates/database/database_browse.html b/src/web_interface/templates/database/database_browse.html index a7c980c94..3314aa197 100644 --- a/src/web_interface/templates/database/database_browse.html +++ b/src/web_interface/templates/database/database_browse.html @@ -1,10 +1,34 @@ {% extends "base.html" %} +{% import 'macros.html' as macros %} + {% set active_page = "Database" %} {% block styles %} + {% endblock %} @@ -65,13 +89,15 @@

Browse Firmware Database

-{% if search_parameters['query_title'] %} +{# show query button #} +{% if search_parameters.query_title %}
{{ search_parameters['query_title'] | render_query_title }}
@@ -81,12 +107,59 @@

Browse Firmware Database

{% endif %} +{# search results #}
{% if firmware_list %}
    {% for firmware in firmware_list %} - {{ firmware | firmware_detail_tabular_field | safe }} + + {# single search result #} + {% call macros.fw_detail_tabular_field(firmware) %} + + {# binary search show matching strings button #} + {% if search_parameters.yara_match_data and firmware.uid in search_parameters.yara_match_data %} +
    + + + {# binary search matching strings #} +
    + + + + + + + + {% for rule, match_list in search_parameters.yara_match_data[firmware.uid].items() %} + {% for match in match_list %} + + + + + + + {% endfor %} + {# if there are more matches than can be displayed... #} + {% if (match_list | length) == 20 %} + + {% endif %} + {% endfor %} +
    RuleOffsetConditionMatching String
    {{ rule | safe }} + + {{ match.offset | safe }} + + {{ match.condition | safe }}{{ match.match | safe }}
    + Only the first 20 matches of rule "{{ rule }}" are displayed +
    +
    +
    + {% endif %} + {% endcall %} {% endfor %}
{% else %} diff --git a/src/web_interface/templates/generic_view/firmware_detail_tabular_field.html b/src/web_interface/templates/generic_view/firmware_detail_tabular_field.html deleted file mode 100644 index 8ea284c67..000000000 --- a/src/web_interface/templates/generic_view/firmware_detail_tabular_field.html +++ /dev/null @@ -1,12 +0,0 @@ -
  • - -
    - {{ firmware.hid }} - {{ firmware.tags | render_fw_tags(size=11) | safe }} -
    -
    -
    - {{ firmware.submission_date | nice_unix_time | safe }} - -
    -
  • diff --git a/src/web_interface/templates/home.html b/src/web_interface/templates/home.html index 90b816e7b..825bda537 100644 --- a/src/web_interface/templates/home.html +++ b/src/web_interface/templates/home.html @@ -1,5 +1,7 @@ {% extends "base.html" %} +{% import 'macros.html' as macros %} + {% set active_page = "Home" %} {% block head %} @@ -97,7 +99,8 @@

    General FACTs

    Latest Firmware Submissions

      {% for firmware in latest_firmware_submissions %} - {{ firmware | firmware_detail_tabular_field | safe }} + {% call macros.fw_detail_tabular_field(firmware) %} + {% endcall %} {% endfor %}
    diff --git a/src/web_interface/templates/macros.html b/src/web_interface/templates/macros.html index 78556879a..635d0fea1 100644 --- a/src/web_interface/templates/macros.html +++ b/src/web_interface/templates/macros.html @@ -33,3 +33,21 @@
    {{ panel_title }}<
    {%- endmacro %} + +{% macro fw_detail_tabular_field(firmware) %} +
  • +
    + +
    + {{ firmware.hid }} + {{ firmware.tags | render_fw_tags(size=11) | safe }} +
    +
    +
    + {{ firmware.submission_date | nice_unix_time | safe }} + +
    +
    + {{ caller() }} +
  • +{% endmacro %} diff --git a/src/web_interface/templates/show_analysis.html b/src/web_interface/templates/show_analysis.html index f562f54b5..d28b35927 100644 --- a/src/web_interface/templates/show_analysis.html +++ b/src/web_interface/templates/show_analysis.html @@ -450,12 +450,16 @@ radare_form.submit(); } document.addEventListener("DOMContentLoaded", function() { - // auto load summary if URL parameter "load_summary=true" is set const urlParams = new URLSearchParams(window.location.search); const summary = urlParams.get('load_summary'); + const preview = urlParams.get('load_preview') || false; const has_children = {{ "true" if firmware.files_included | length > 0 else "false" }}; if (summary === "true" && has_children && selected_analysis !== "None") { + // automatically load summary if URL parameter "load_summary=true" is set load_summary(uid, selected_analysis, focus=true); + } else if (preview !== false) { + // automatically load preview at address xyz if URL parameter "load_preview=xyz" is set + load_preview(preview, true); } });