diff --git a/floss/qs/db/expert.py b/floss/qs/db/expert.py index 738720f98..7bf7a2ac3 100644 --- a/floss/qs/db/expert.py +++ b/floss/qs/db/expert.py @@ -30,25 +30,29 @@ class ExpertStringDatabase: def __len__(self) -> int: return len(self.string_rules) + len(self.substring_rules) + len(self.regex_rules) - def query(self, s: str) -> Set[str]: - ret = set() + def query(self, s: str) -> Tuple[Set[str], List[ExpertRule]]: + ret_set = set() + ret_list = list() if s in self.string_rules: - ret.add(self.string_rules[s].tag) + ret_set.add(self.string_rules[s].tag) + ret_list.append(self.string_rules[s]) # note that this is O(m * n) # #strings * #rules for rule in self.substring_rules: if rule.value in s: - ret.add(rule.tag) + ret_set.add(rule.tag) + ret_list.append(rule) # note that this is O(m * n) # #strings * #rules for rule, regex in self.regex_rules: if regex.search(s): - ret.add(rule.tag) + ret_set.add(rule.tag) + ret_list.append(rule) - return ret + return ret_set, ret_list @classmethod def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase": diff --git a/floss/qs/db/gp.py b/floss/qs/db/gp.py index 0b2f503b6..d13a4e0d3 100644 --- a/floss/qs/db/gp.py +++ b/floss/qs/db/gp.py @@ -105,12 +105,16 @@ def __contains__(self, other: bytes | str) -> bool: if isinstance(other, bytes): return other in self.string_hashes elif isinstance(other, str): - m = hashlib.md5() - m.update(other.encode("utf-8")) - return m.digest()[:8] in self.string_hashes + return self.get_hash(other) in self.string_hashes else: raise ValueError("other must be bytes or str") + @staticmethod + def get_hash(string: str) -> bytes: + m = hashlib.md5() + m.update(string.encode("utf-8")) + return m.digest()[:8] + @classmethod def from_file(cls, path: pathlib.Path) -> "StringHashDatabase": string_hashes: Set[bytes] = set() diff --git a/floss/qs/main.py b/floss/qs/main.py index 8cc2a542c..94be9febf 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -11,11 +11,13 @@ import functools import itertools import contextlib +import dataclasses from collections import defaultdict from typing import Set, Dict, List, Union, Tuple, Literal, Callable, Iterable, Optional, Sequence from dataclasses import field, dataclass import pefile +import msgspec import colorama import lancelot import rich.traceback @@ -28,9 +30,9 @@ import floss.qs.db.oss import floss.qs.db.expert import floss.qs.db.winapi -from floss.qs.db.gp import StringHashDatabase, StringGlobalPrevalenceDatabase -from floss.qs.db.oss import OpenSourceStringDatabase -from floss.qs.db.expert import ExpertStringDatabase +from floss.qs.db.gp import StringGlobalPrevalence, StringHashDatabase, StringGlobalPrevalenceDatabase +from floss.qs.db.oss import OpenSourceString, OpenSourceStringDatabase +from floss.qs.db.expert import ExpertRule, ExpertStringDatabase from floss.qs.db.winapi import WindowsApiStringDatabase logger = logging.getLogger("quantumstrand") @@ -111,12 +113,14 @@ class ExtractedString: Tag = str +MetadataType = Union[StringGlobalPrevalence, OpenSourceString, ExpertRule] @dataclass class TaggedString: string: ExtractedString tags: Set[Tag] structure: str = "" + meta: Sequence[MetadataType] = field(default_factory=list) @property def offset(self) -> int: @@ -124,6 +128,109 @@ def offset(self) -> int: return self.string.slice.range.offset +@dataclass +class ResultDocument: + slice: 'Slice' + name: str + strings: Sequence['TaggedString'] = field(default_factory=list) + xor_key: Optional[str] = None + children: Sequence['ResultDocument'] = field(default_factory=list) + visible_predecessor: Optional[bool] = field(default=None) + visible_successor: Optional[bool] = field(default=None) + + parent: Optional['ResultDocument'] = field(default=None) + + @property + def predecessors(self) -> Iterable['ResultDocument']: + """traverse to the prior siblings""" + if self.parent is None: + return None + + index = self.parent.children.index(self) + if index == 0: + return None + + for i in range(index - 1, -1, -1): + yield self.parent.children[i] + + @property + def predecessor(self) -> Optional['ResultDocument']: + """traverse to the prior sibling""" + return next(iter(self.predecessors), None) + + @property + def successors(self) -> Iterable['ResultDocument']: + """traverse to the next siblings""" + if self.parent is None: + return None + + index = self.parent.children.index(self) + if index == len(self.parent.children) - 1: + return None + + for i in range(index + 1, len(self.parent.children)): + yield self.parent.children[i] + + @property + def successor(self) -> Optional['ResultDocument']: + """traverse to the next sibling""" + return next(iter(self.successors), None) + + @property + def visible_predecessors(self) -> bool: + """Check if there is any predecessor with strings.""" + return any(predecessor.strings for predecessor in self.predecessors) + + @property + def visible_successors(self) -> bool: + """Check if there is any successor or further descendant with strings.""" + return any(successor.strings or successor.visible_successors for successor in self.successors) + + @property + def offset(self) -> int: + "convenience" + return self.slice.range.offset + + @property + def end(self) -> int: + "convenience" + return self.slice.range.end + + @classmethod + def from_layout(cls, layout: 'Layout') -> 'ResultDocument': + result = cls(layout.slice, layout.name, + [TaggedString(string.string, + string.tags, + string.structure, + string.meta) + for string in layout.strings], + ) + if isinstance(layout, PELayout): + result.xor_key = layout.xor_key + + result.children = [cls.from_layout(child) for child in layout.children] + + # Set parent for children + for child in result.children: + child.parent = result + + # Set visible_predecessors and visible_successors + for child in result.children: + child.visible_predecessor = child.visible_predecessors + child.visible_successor = child.visible_successors + + return result + + def asdict(self): + return { 'slice': self.slice, + 'name': self.name, + 'strings': self.strings, + 'xor_key': self.xor_key, + 'children': [child.asdict() for child in self.children], + 'visible_predecessors': self.visible_predecessors, + 'visible_successors': self.visible_successors } + + MIN_STR_LEN = 6 ASCII_BYTE = r" !\"#\$%&\'\(\)\*\+,-\./0123456789:;<=>\?@ABCDEFGHIJKLMNOPQRSTUVWXYZ\[\]\^_`abcdefghijklmnopqrstuvwxyz\{\|\}\\\~\t".encode( "ascii" @@ -381,71 +488,72 @@ def get_reloc_offsets(slice: Slice, pe: pefile.PE) -> Set[int]: def check_is_xor(xor_key: Union[int, None]): if isinstance(xor_key, int): - return ("#decoded",) + return ("#decoded",), [] - return () + return (), [] def check_is_reloc(reloc_offsets: Set[int], string: ExtractedString): for addr in string.slice.range: if addr in reloc_offsets: - return ("#reloc",) + return ("#reloc",), [] - return () + return (), [] def check_is_code(code_offsets: Set[int], string: ExtractedString): for addr in string.slice.range: if addr in code_offsets: - return ("#code",) + return ("#code",), [] - return () + return (), [] def query_code_string_database(db: StringGlobalPrevalenceDatabase, string: str): if db.query(string): - return ("#code-junk",) + return ("#code-junk",), db.query(string) - return () + return (), [] def query_global_prevalence_database(db: StringGlobalPrevalenceDatabase, string: str): if db.query(string): - return ("#common",) + return ("#common",), db.query(string) - return () + return (), [] def query_global_prevalence_hash_database(db: StringHashDatabase, string: str): if string in db: - return ("#common",) + return ("#common",), [{'hash': str(db.get_hash(string))}] - return () + return (), [] -def query_library_string_database(db: OpenSourceStringDatabase, string: str) -> Sequence[Tag]: +def query_library_string_database(db: OpenSourceStringDatabase, string: str): meta = db.metadata_by_string.get(string) if not meta: - return () + return (), [] - return (f"#{meta.library_name}",) + return (f"#{meta.library_name}",), meta -def query_expert_string_database(db: ExpertStringDatabase, string: str) -> Sequence[Tag]: - return tuple(db.query(string)) +def query_expert_string_database(db: ExpertStringDatabase, string: str): + tag, meta = db.query(string) + return tag, meta -def query_winapi_name_database(db: WindowsApiStringDatabase, string: str) -> Sequence[Tag]: +def query_winapi_name_database(db: WindowsApiStringDatabase, string: str): if string.lower() in db.dll_names: - return ("#winapi",) + return ("#winapi",), [] if string in db.api_names: - return ("#winapi",) + return ("#winapi",), [] - return () + return (), [] -Tagger = Callable[[ExtractedString], Sequence[Tag]] +Tagger = Callable[[ExtractedString], Tuple[Sequence[Tag], MetadataType]] def load_databases() -> Sequence[Tagger]: @@ -454,7 +562,7 @@ def load_databases() -> Sequence[Tagger]: def query_database(db, queryfn, string: ExtractedString): return queryfn(db, string.string) - def make_tagger(db, queryfn) -> Sequence[Tag]: + def make_tagger(db, queryfn) -> Tuple[Sequence[Tag], MetadataType]: return functools.partial(query_database, db, queryfn) for db in floss.qs.db.winapi.get_default_databases(): @@ -591,42 +699,6 @@ class Layout(abc.ABC): # so they come from before/between/after the children ranges. strings: List[TaggedString] = field(init=False, default_factory=list) - @property - def predecessors(self) -> Iterable["Layout"]: - """traverse to the prior siblings`""" - if self.parent is None: - return None - - index = self.parent.children.index(self) - if index == 0: - return None - - for i in range(index - 1, -1, -1): - yield self.parent.children[i] - - @property - def predecessor(self) -> Optional["Layout"]: - """traverse to the prior sibling""" - return next(iter(self.predecessors), None) - - @property - def successors(self) -> Iterable["Layout"]: - """traverse to the next siblings""" - if self.parent is None: - return None - - index = self.parent.children.index(self) - if index == len(self.parent.children) - 1: - return None - - for i in range(index + 1, len(self.parent.children)): - yield self.parent.children[i] - - @property - def successor(self) -> Optional["Layout"]: - """traverse to the next sibling""" - return next(iter(self.successors), None) - def add_child(self, child: "Layout"): # this works in py3.11, though mypy gets confused, # maybe due to the use of the key function. @@ -661,6 +733,7 @@ def tag_strings(self, taggers: Sequence[Tagger]): # this routine will transform them into TaggedStrings. assert isinstance(string, ExtractedString) tags: Set[Tag] = set() + metas: List[MetadataType] = list() string_counts[string.string] += 1 @@ -668,9 +741,12 @@ def tag_strings(self, taggers: Sequence[Tagger]): tags.add("#duplicate") for tagger in taggers: - tags.update(tagger(string)) + tag, meta = tagger(string) + tags.update(tag) + if meta != [] and meta not in metas: + metas.append(meta) - tagged_strings.append(TaggedString(string, tags)) + tagged_strings.append(TaggedString(string, tags, meta=metas)) self.strings = tagged_strings for child in self.children: @@ -1062,30 +1138,46 @@ def hide_strings_by_rules(layout: Layout, tag_rules: TagRules): hide_strings_by_rules(child, tag_rules) -def has_visible_children(layout: Layout) -> bool: +def has_visible_children(layout: ResultDocument) -> bool: return any(map(is_visible, layout.children)) -def is_visible(layout: Layout) -> bool: +def is_visible(layout: ResultDocument) -> bool: "a layout is visible if it has any strings (or its children do)" return bool(layout.strings) or has_visible_children(layout) -def has_visible_predecessors(layout: Layout) -> bool: - return any(map(is_visible, layout.predecessors)) +class QSJSONEncoder(json.JSONEncoder): + """ + serializes QS into JSON. + """ + + def default(self, o): + if isinstance(o, set): + return list(o) + if isinstance(o, MetadataType): + return json.loads(msgspec.json.encode(o)) + if dataclasses.is_dataclass(o): + if isinstance(o, Slice): + return o.range + else: + return dataclasses.asdict(o) -def has_visible_successors(layout: Layout) -> bool: - return any(map(is_visible, layout.successors)) +def render_json(console: Console, doc: ResultDocument): + console.print(json.dumps( + doc.asdict(), + cls=QSJSONEncoder, + )) def render_strings( - console: Console, layout: Layout, tag_rules: TagRules, depth: int = 0, name_hint: Optional[str] = None + console: Console, results: ResultDocument, tag_rules: TagRules, depth: int = 0, name_hint: Optional[str] = None ): - if not is_visible(layout): + if not is_visible(results): return - if len(layout.children) == 1 and layout.slice.range == layout.children[0].slice.range: + if len(results.children) == 1 and results.slice.range == results.children[0].slice.range: # when a layout is completely dominated by its single child # then we can directly render the child, # retaining just a hint of the parent's name. @@ -1093,13 +1185,13 @@ def render_strings( # for example: # # rsrc: BINARY/102/0 (pe) - return render_strings(console, layout.children[0], tag_rules, depth, name_hint=layout.name) + return render_strings(console, results.children[0], tag_rules, depth, name_hint=results.name) BORDER_STYLE = MUTED_STYLE - name = layout.name - if isinstance(layout, PELayout) and layout.xor_key: # Check if the layout is PELayout and is xored - name += f" (XOR decoded with key: 0x{layout.xor_key:x})" + name = results.name + if results.xor_key: # Check if xor_key present + name += f" (XOR decoded with key: 0x{results.xor_key:x})" if name_hint: name = f"{name_hint} ({name})" @@ -1112,7 +1204,7 @@ def render_strings( name_offset = header.plain.index(" ") + 1 header.stylize(Style(color="blue"), name_offset, name_offset + len(name)) - if not has_visible_predecessors(layout): + if not results.visible_predecessor: header_shape = "┓" else: header_shape = "┫" @@ -1130,20 +1222,20 @@ def render_string_line(console: Console, tag_rules: TagRules, string: TaggedStri line.append_text(Span("┃" * (depth + 1), style=BORDER_STYLE)) console.print(line) - if not layout.children: + if not results.children: # for string in layout.strings[:4]: - for string in layout.strings: + for string in results.strings: render_string_line(console, tag_rules, string, depth) else: - for i, child in enumerate(layout.children): + for i, child in enumerate(results.children): if i == 0: # render strings before first child - strings_before_child = list(filter(lambda s: layout.offset <= s.offset < child.offset, layout.strings)) + strings_before_child = list(filter(lambda s: results.offset <= s.offset < child.offset, results.strings)) else: # render strings between children - last_child = layout.children[i - 1] - strings_before_child = list(filter(lambda s: last_child.end < s.offset < child.offset, layout.strings)) + last_child = results.children[i - 1] + strings_before_child = list(filter(lambda s: last_child.end < s.offset < child.offset, results.strings)) # for string in strings_before_child[:4]: for string in strings_before_child: @@ -1152,12 +1244,12 @@ def render_string_line(console: Console, tag_rules: TagRules, string: TaggedStri render_strings(console, child, tag_rules, depth + 1) # render strings after last child - strings_after_children = list(filter(lambda s: child.end < s.offset < layout.end, layout.strings)) + strings_after_children = list(filter(lambda s: child.end < s.offset < results.end, results.strings)) # for string in strings_after_children[:4]: for string in strings_after_children: render_string_line(console, tag_rules, string, depth) - if not has_visible_successors(layout): + if not results.visible_successor: footer = Span("", style=BORDER_STYLE) footer.align("center", width=console.width, character="━") @@ -1181,6 +1273,7 @@ def main(): default=MIN_STR_LEN, help="minimum string length", ) + parser.add_argument("--json", action="store_true", help="print JSON representation of result") logging_group = parser.add_argument_group("logging arguments") logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( @@ -1250,8 +1343,14 @@ def main(): # hide (remove) strings according to the above rules hide_strings_by_rules(layout, tag_rules) + result_document = ResultDocument.from_layout(layout) + console = Console() - render_strings(console, layout, tag_rules) + + if args.json: + render_json(console, result_document) + else: + render_strings(console, result_document, tag_rules) return 0