From 77679cbe5736fc299f06f29d81923e8ac639542c Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Thu, 14 Mar 2024 23:28:21 +0530 Subject: [PATCH 1/7] ResultDocument output schema --- floss/qs/main.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/floss/qs/main.py b/floss/qs/main.py index 8cc2a542c..5ee3d66fe 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -123,6 +123,41 @@ def offset(self) -> int: "convenience" return self.string.slice.range.offset + def to_dict(self): + return { + "string": self.string, + "structure": self.structure, + "tags": list(self.tags), + "offset": self.offset + } + + +@dataclass +class ResultDocument: + slice: Slice + name: str + strings: List[TaggedString] + parent: Optional['ResultDocument'] = field(default=None) + children: Sequence['ResultDocument'] = field(default_factory=list) + + def add_string(self, string: TaggedString): + self.strings.append(string) + + def add_child(self, child: 'ResultDocument'): + self.children.append(child) + + def set_parent(self, parent: 'ResultDocument'): + self.parent = parent + + def to_dict(self): + return { + "slice": (self.slice.range.offset, self.slice.range.length), + "name": self.name, + "strings": [string.to_dict() for string in self.strings], + "parent": self.parent.name if self.parent else None, + "children": [child.to_dict() for child in self.children] + } + MIN_STR_LEN = 6 ASCII_BYTE = r" !\"#\$%&\'\(\)\*\+,-\./0123456789:;<=>\?@ABCDEFGHIJKLMNOPQRSTUVWXYZ\[\]\^_`abcdefghijklmnopqrstuvwxyz\{\|\}\\\~\t".encode( @@ -1079,6 +1114,15 @@ def has_visible_successors(layout: Layout) -> bool: return any(map(is_visible, layout.successors)) +def to_qs(layout: Layout) -> ResultDocument: + doc = ResultDocument(layout.slice, layout.name, layout.strings) + for child in layout.children: + child_doc = to_qs(child) # recursively convert all children + doc.add_child(child_doc) + child_doc.set_parent(doc) # set parent of children to be the current doc + return doc + + def render_strings( console: Console, layout: Layout, tag_rules: TagRules, depth: int = 0, name_hint: Optional[str] = None ): @@ -1250,6 +1294,8 @@ def main(): # hide (remove) strings according to the above rules hide_strings_by_rules(layout, tag_rules) + result_document = to_qs(layout) + console = Console() render_strings(console, layout, tag_rules) From 721757afd0e7a934b5579cc14a9ce62488e7cca5 Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Sat, 16 Mar 2024 19:19:23 +0530 Subject: [PATCH 2/7] ResultDoc dataclass with from_layout --- floss/qs/main.py | 76 ++++++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/floss/qs/main.py b/floss/qs/main.py index 5ee3d66fe..250949e97 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -11,6 +11,7 @@ import functools import itertools import contextlib +import dataclasses from collections import defaultdict from typing import Set, Dict, List, Union, Tuple, Literal, Callable, Iterable, Optional, Sequence from dataclasses import field, dataclass @@ -123,40 +124,29 @@ def offset(self) -> int: "convenience" return self.string.slice.range.offset - def to_dict(self): - return { - "string": self.string, - "structure": self.structure, - "tags": list(self.tags), - "offset": self.offset - } - @dataclass class ResultDocument: slice: Slice name: str - strings: List[TaggedString] - parent: Optional['ResultDocument'] = field(default=None) + strings: Sequence[TaggedString] = field(default_factory=list) children: Sequence['ResultDocument'] = field(default_factory=list) - def add_string(self, string: TaggedString): - self.strings.append(string) + @classmethod + def from_layout(cls, layout: 'Layout') -> "ResultDocument": + result = cls( + Slice(layout.slice.buf, (layout.slice.range.offset, layout.slice.range.length)), + layout.name, + [TaggedString( + string.string, + string.tags, + string.structure, + ) for string in layout.strings if isinstance(string, TaggedString)], + ) - def add_child(self, child: 'ResultDocument'): - self.children.append(child) + result.children = [cls.from_layout(child) for child in layout.children] - def set_parent(self, parent: 'ResultDocument'): - self.parent = parent - - def to_dict(self): - return { - "slice": (self.slice.range.offset, self.slice.range.length), - "name": self.name, - "strings": [string.to_dict() for string in self.strings], - "parent": self.parent.name if self.parent else None, - "children": [child.to_dict() for child in self.children] - } + return result MIN_STR_LEN = 6 @@ -1114,13 +1104,26 @@ def has_visible_successors(layout: Layout) -> bool: return any(map(is_visible, layout.successors)) -def to_qs(layout: Layout) -> ResultDocument: - doc = ResultDocument(layout.slice, layout.name, layout.strings) - for child in layout.children: - child_doc = to_qs(child) # recursively convert all children - doc.add_child(child_doc) - child_doc.set_parent(doc) # set parent of children to be the current doc - return doc +class QSJSONEncoder(json.JSONEncoder): + """ + serializes QS into JSON. + """ + + def default(self, o): + if dataclasses.is_dataclass(o): + if isinstance(o, Slice): + return o.range + elif isinstance(o, set): + return list(o) + else: + return dataclasses.asdict(o) + + +def render_json(console: Console, doc: ResultDocument): + console.print(json.dumps( + doc, + cls=QSJSONEncoder, + )) def render_strings( @@ -1225,6 +1228,7 @@ def main(): default=MIN_STR_LEN, help="minimum string length", ) + parser.add_argument("--json", action="store_true", help="print JSON representation of result") logging_group = parser.add_argument_group("logging arguments") logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") logging_group.add_argument( @@ -1294,10 +1298,14 @@ def main(): # hide (remove) strings according to the above rules hide_strings_by_rules(layout, tag_rules) - result_document = to_qs(layout) + result_document = ResultDocument.from_layout(layout) console = Console() - render_strings(console, layout, tag_rules) + + if args.json: + render_json(console, result_document) + else: + render_strings(console, layout, tag_rules) return 0 From 4b13b7638c61f503e5b2d38f9df410d9f672f6d6 Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Sun, 17 Mar 2024 14:04:33 +0530 Subject: [PATCH 3/7] modify render_strings --- floss/qs/main.py | 123 +++++++++++++++++++++++------------------------ 1 file changed, 60 insertions(+), 63 deletions(-) diff --git a/floss/qs/main.py b/floss/qs/main.py index 250949e97..e94f69036 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -127,26 +127,53 @@ def offset(self) -> int: @dataclass class ResultDocument: - slice: Slice + slice: 'Slice' name: str - strings: Sequence[TaggedString] = field(default_factory=list) + strings: Sequence['TaggedString'] = field(default_factory=list) + xor_key: Optional[str] = None children: Sequence['ResultDocument'] = field(default_factory=list) - @classmethod - def from_layout(cls, layout: 'Layout') -> "ResultDocument": - result = cls( - Slice(layout.slice.buf, (layout.slice.range.offset, layout.slice.range.length)), - layout.name, - [TaggedString( - string.string, - string.tags, - string.structure, - ) for string in layout.strings if isinstance(string, TaggedString)], - ) + def __post_init__(self): + for child in self.children: + child.parent = self + + @property + def visible_predecessors(self) -> bool: + current = self + while current is not None: + if current.strings: + return True + current = getattr(current, "parent", None) + return False + + @property + def visible_successors(self) -> bool: + return any(child.strings for child in self.children) or \ + any(child.visible_successors for child in self.children) + + @classmethod + def from_layout(cls, layout: 'Layout') -> 'ResultDocument': + result = cls(layout.slice, layout.name, + [TaggedString(string.string, + string.tags, + string.structure) + for string in layout.strings], + ) + if isinstance(layout, PELayout): + result.xor_key = layout.xor_key result.children = [cls.from_layout(child) for child in layout.children] return result + + def asdict(self): + return { 'slice': self.slice, + 'name': self.name, + 'strings': self.strings, + 'xor_key': self.xor_key, + 'children': [child.asdict() for child in self.children], + 'visible_predecessors': self.visible_predecessors, + 'visible_successors': self.visible_successors } MIN_STR_LEN = 6 @@ -629,28 +656,6 @@ def predecessors(self) -> Iterable["Layout"]: for i in range(index - 1, -1, -1): yield self.parent.children[i] - @property - def predecessor(self) -> Optional["Layout"]: - """traverse to the prior sibling""" - return next(iter(self.predecessors), None) - - @property - def successors(self) -> Iterable["Layout"]: - """traverse to the next siblings""" - if self.parent is None: - return None - - index = self.parent.children.index(self) - if index == len(self.parent.children) - 1: - return None - - for i in range(index + 1, len(self.parent.children)): - yield self.parent.children[i] - - @property - def successor(self) -> Optional["Layout"]: - """traverse to the next sibling""" - return next(iter(self.successors), None) def add_child(self, child: "Layout"): # this works in py3.11, though mypy gets confused, @@ -1087,23 +1092,15 @@ def hide_strings_by_rules(layout: Layout, tag_rules: TagRules): hide_strings_by_rules(child, tag_rules) -def has_visible_children(layout: Layout) -> bool: +def has_visible_children(layout: ResultDocument) -> bool: return any(map(is_visible, layout.children)) -def is_visible(layout: Layout) -> bool: +def is_visible(layout: ResultDocument) -> bool: "a layout is visible if it has any strings (or its children do)" return bool(layout.strings) or has_visible_children(layout) -def has_visible_predecessors(layout: Layout) -> bool: - return any(map(is_visible, layout.predecessors)) - - -def has_visible_successors(layout: Layout) -> bool: - return any(map(is_visible, layout.successors)) - - class QSJSONEncoder(json.JSONEncoder): """ serializes QS into JSON. @@ -1121,18 +1118,18 @@ def default(self, o): def render_json(console: Console, doc: ResultDocument): console.print(json.dumps( - doc, + doc.asdict(), cls=QSJSONEncoder, )) def render_strings( - console: Console, layout: Layout, tag_rules: TagRules, depth: int = 0, name_hint: Optional[str] = None + console: Console, results: ResultDocument, tag_rules: TagRules, depth: int = 0, name_hint: Optional[str] = None ): - if not is_visible(layout): + if not is_visible(results): return - if len(layout.children) == 1 and layout.slice.range == layout.children[0].slice.range: + if len(results.children) == 1 and results.slice.range == results.children[0].slice.range: # when a layout is completely dominated by its single child # then we can directly render the child, # retaining just a hint of the parent's name. @@ -1140,13 +1137,13 @@ def render_strings( # for example: # # rsrc: BINARY/102/0 (pe) - return render_strings(console, layout.children[0], tag_rules, depth, name_hint=layout.name) + return render_strings(console, results.children[0], tag_rules, depth, name_hint=results.name) BORDER_STYLE = MUTED_STYLE - name = layout.name - if isinstance(layout, PELayout) and layout.xor_key: # Check if the layout is PELayout and is xored - name += f" (XOR decoded with key: 0x{layout.xor_key:x})" + name = results.name + if results.xor_key: # Check if xor_key present + name += f" (XOR decoded with key: 0x{results.xor_key:x})" if name_hint: name = f"{name_hint} ({name})" @@ -1159,7 +1156,7 @@ def render_strings( name_offset = header.plain.index(" ") + 1 header.stylize(Style(color="blue"), name_offset, name_offset + len(name)) - if not has_visible_predecessors(layout): + if not results.visible_predecessors: header_shape = "┓" else: header_shape = "┫" @@ -1177,20 +1174,20 @@ def render_string_line(console: Console, tag_rules: TagRules, string: TaggedStri line.append_text(Span("┃" * (depth + 1), style=BORDER_STYLE)) console.print(line) - if not layout.children: + if not results.children: # for string in layout.strings[:4]: - for string in layout.strings: + for string in results.strings: render_string_line(console, tag_rules, string, depth) else: - for i, child in enumerate(layout.children): + for i, child in enumerate(results.children): if i == 0: # render strings before first child - strings_before_child = list(filter(lambda s: layout.offset <= s.offset < child.offset, layout.strings)) + strings_before_child = list(filter(lambda s: results.offset <= s.offset < child.offset, results.strings)) else: # render strings between children - last_child = layout.children[i - 1] - strings_before_child = list(filter(lambda s: last_child.end < s.offset < child.offset, layout.strings)) + last_child = results.children[i - 1] + strings_before_child = list(filter(lambda s: last_child.end < s.offset < child.offset, results.strings)) # for string in strings_before_child[:4]: for string in strings_before_child: @@ -1199,12 +1196,12 @@ def render_string_line(console: Console, tag_rules: TagRules, string: TaggedStri render_strings(console, child, tag_rules, depth + 1) # render strings after last child - strings_after_children = list(filter(lambda s: child.end < s.offset < layout.end, layout.strings)) + strings_after_children = list(filter(lambda s: child.end < s.offset < results.end, results.strings)) # for string in strings_after_children[:4]: for string in strings_after_children: render_string_line(console, tag_rules, string, depth) - if not has_visible_successors(layout): + if not results.visible_successors: footer = Span("", style=BORDER_STYLE) footer.align("center", width=console.width, character="━") @@ -1305,7 +1302,7 @@ def main(): if args.json: render_json(console, result_document) else: - render_strings(console, layout, tag_rules) + render_strings(console, result_document, tag_rules) return 0 From 60d4492fcbe2f81dfe48d3f03efda77a230a6d76 Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Mon, 18 Mar 2024 00:33:59 +0530 Subject: [PATCH 4/7] fix broken logic --- floss/qs/main.py | 70 +++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/floss/qs/main.py b/floss/qs/main.py index e94f69036..6793d005d 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -133,25 +133,54 @@ class ResultDocument: xor_key: Optional[str] = None children: Sequence['ResultDocument'] = field(default_factory=list) + parent: Optional['ResultDocument'] = field(default=None) - def __post_init__(self): - for child in self.children: - child.parent = self + @property + def predecessors(self) -> Iterable['ResultDocument']: + """traverse to the prior siblings""" + if self.parent is None: + return None + + index = self.parent.children.index(self) + if index == 0: + return None + + for i in range(index - 1, -1, -1): + yield self.parent.children[i] + + @property + def predecessor(self) -> Optional['ResultDocument']: + """traverse to the prior sibling""" + return next(iter(self.predecessors), None) + + @property + def successors(self) -> Iterable['ResultDocument']: + """traverse to the next siblings""" + if self.parent is None: + return None + + index = self.parent.children.index(self) + if index == len(self.parent.children) - 1: + return None + + for i in range(index + 1, len(self.parent.children)): + yield self.parent.children[i] + + @property + def successor(self) -> Optional['ResultDocument']: + """traverse to the next sibling""" + return next(iter(self.successors), None) @property def visible_predecessors(self) -> bool: - current = self - while current is not None: - if current.strings: - return True - current = getattr(current, "parent", None) - return False + """Check if there is any predecessor with strings.""" + return any(predecessor.strings for predecessor in self.predecessors) @property def visible_successors(self) -> bool: - return any(child.strings for child in self.children) or \ - any(child.visible_successors for child in self.children) - + """Check if there is any successor or further descendant with strings.""" + return any(successor.strings or successor.visible_successors for successor in self.successors) + @classmethod def from_layout(cls, layout: 'Layout') -> 'ResultDocument': result = cls(layout.slice, layout.name, @@ -163,7 +192,8 @@ def from_layout(cls, layout: 'Layout') -> 'ResultDocument': if isinstance(layout, PELayout): result.xor_key = layout.xor_key result.children = [cls.from_layout(child) for child in layout.children] - + for child in result.children: + child.parent = result return result def asdict(self): @@ -643,20 +673,6 @@ class Layout(abc.ABC): # so they come from before/between/after the children ranges. strings: List[TaggedString] = field(init=False, default_factory=list) - @property - def predecessors(self) -> Iterable["Layout"]: - """traverse to the prior siblings`""" - if self.parent is None: - return None - - index = self.parent.children.index(self) - if index == 0: - return None - - for i in range(index - 1, -1, -1): - yield self.parent.children[i] - - def add_child(self, child: "Layout"): # this works in py3.11, though mypy gets confused, # maybe due to the use of the key function. From 75bef941b9cac9ae506bf4d02b4608854bd696e9 Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Mon, 18 Mar 2024 00:48:06 +0530 Subject: [PATCH 5/7] new fields --- floss/qs/main.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/floss/qs/main.py b/floss/qs/main.py index 6793d005d..703af4659 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -132,6 +132,8 @@ class ResultDocument: strings: Sequence['TaggedString'] = field(default_factory=list) xor_key: Optional[str] = None children: Sequence['ResultDocument'] = field(default_factory=list) + visible_predecessor: Optional[bool] = field(default=None) + visible_successor: Optional[bool] = field(default=None) parent: Optional['ResultDocument'] = field(default=None) @@ -191,9 +193,18 @@ def from_layout(cls, layout: 'Layout') -> 'ResultDocument': ) if isinstance(layout, PELayout): result.xor_key = layout.xor_key + result.children = [cls.from_layout(child) for child in layout.children] + + # Set parent for children for child in result.children: child.parent = result + + # Set visible_predecessors and visible_successors + for child in result.children: + child.visible_predecessor = child.visible_predecessors + child.visible_successor = child.visible_successors + return result def asdict(self): From 4daed607936d411975dfc7caf54b76f0459e6fb8 Mon Sep 17 00:00:00 2001 From: ooprathamm-college Date: Mon, 18 Mar 2024 10:53:31 +0530 Subject: [PATCH 6/7] missing property + update render_strings --- floss/qs/main.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/floss/qs/main.py b/floss/qs/main.py index 703af4659..9c4e869c7 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -183,6 +183,16 @@ def visible_successors(self) -> bool: """Check if there is any successor or further descendant with strings.""" return any(successor.strings or successor.visible_successors for successor in self.successors) + @property + def offset(self) -> int: + "convenience" + return self.slice.range.offset + + @property + def end(self) -> int: + "convenience" + return self.slice.range.end + @classmethod def from_layout(cls, layout: 'Layout') -> 'ResultDocument': result = cls(layout.slice, layout.name, @@ -1183,7 +1193,7 @@ def render_strings( name_offset = header.plain.index(" ") + 1 header.stylize(Style(color="blue"), name_offset, name_offset + len(name)) - if not results.visible_predecessors: + if not results.visible_predecessor: header_shape = "┓" else: header_shape = "┫" @@ -1228,7 +1238,7 @@ def render_string_line(console: Console, tag_rules: TagRules, string: TaggedStri for string in strings_after_children: render_string_line(console, tag_rules, string, depth) - if not results.visible_successors: + if not results.visible_successor: footer = Span("", style=BORDER_STYLE) footer.align("center", width=console.width, character="━") From 2caf018bbb6f9dad4e8b6605456b7166e206b20d Mon Sep 17 00:00:00 2001 From: ooprathamm Date: Wed, 20 Mar 2024 11:08:38 +0530 Subject: [PATCH 7/7] Made taggers return db keys --- floss/qs/db/expert.py | 16 ++++++---- floss/qs/db/gp.py | 10 ++++-- floss/qs/main.py | 73 +++++++++++++++++++++++++------------------ 3 files changed, 59 insertions(+), 40 deletions(-) diff --git a/floss/qs/db/expert.py b/floss/qs/db/expert.py index 738720f98..7bf7a2ac3 100644 --- a/floss/qs/db/expert.py +++ b/floss/qs/db/expert.py @@ -30,25 +30,29 @@ class ExpertStringDatabase: def __len__(self) -> int: return len(self.string_rules) + len(self.substring_rules) + len(self.regex_rules) - def query(self, s: str) -> Set[str]: - ret = set() + def query(self, s: str) -> Tuple[Set[str], List[ExpertRule]]: + ret_set = set() + ret_list = list() if s in self.string_rules: - ret.add(self.string_rules[s].tag) + ret_set.add(self.string_rules[s].tag) + ret_list.append(self.string_rules[s]) # note that this is O(m * n) # #strings * #rules for rule in self.substring_rules: if rule.value in s: - ret.add(rule.tag) + ret_set.add(rule.tag) + ret_list.append(rule) # note that this is O(m * n) # #strings * #rules for rule, regex in self.regex_rules: if regex.search(s): - ret.add(rule.tag) + ret_set.add(rule.tag) + ret_list.append(rule) - return ret + return ret_set, ret_list @classmethod def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase": diff --git a/floss/qs/db/gp.py b/floss/qs/db/gp.py index 0b2f503b6..d13a4e0d3 100644 --- a/floss/qs/db/gp.py +++ b/floss/qs/db/gp.py @@ -105,12 +105,16 @@ def __contains__(self, other: bytes | str) -> bool: if isinstance(other, bytes): return other in self.string_hashes elif isinstance(other, str): - m = hashlib.md5() - m.update(other.encode("utf-8")) - return m.digest()[:8] in self.string_hashes + return self.get_hash(other) in self.string_hashes else: raise ValueError("other must be bytes or str") + @staticmethod + def get_hash(string: str) -> bytes: + m = hashlib.md5() + m.update(string.encode("utf-8")) + return m.digest()[:8] + @classmethod def from_file(cls, path: pathlib.Path) -> "StringHashDatabase": string_hashes: Set[bytes] = set() diff --git a/floss/qs/main.py b/floss/qs/main.py index 9c4e869c7..94be9febf 100644 --- a/floss/qs/main.py +++ b/floss/qs/main.py @@ -17,6 +17,7 @@ from dataclasses import field, dataclass import pefile +import msgspec import colorama import lancelot import rich.traceback @@ -29,9 +30,9 @@ import floss.qs.db.oss import floss.qs.db.expert import floss.qs.db.winapi -from floss.qs.db.gp import StringHashDatabase, StringGlobalPrevalenceDatabase -from floss.qs.db.oss import OpenSourceStringDatabase -from floss.qs.db.expert import ExpertStringDatabase +from floss.qs.db.gp import StringGlobalPrevalence, StringHashDatabase, StringGlobalPrevalenceDatabase +from floss.qs.db.oss import OpenSourceString, OpenSourceStringDatabase +from floss.qs.db.expert import ExpertRule, ExpertStringDatabase from floss.qs.db.winapi import WindowsApiStringDatabase logger = logging.getLogger("quantumstrand") @@ -112,12 +113,14 @@ class ExtractedString: Tag = str +MetadataType = Union[StringGlobalPrevalence, OpenSourceString, ExpertRule] @dataclass class TaggedString: string: ExtractedString tags: Set[Tag] structure: str = "" + meta: Sequence[MetadataType] = field(default_factory=list) @property def offset(self) -> int: @@ -198,7 +201,8 @@ def from_layout(cls, layout: 'Layout') -> 'ResultDocument': result = cls(layout.slice, layout.name, [TaggedString(string.string, string.tags, - string.structure) + string.structure, + string.meta) for string in layout.strings], ) if isinstance(layout, PELayout): @@ -484,71 +488,72 @@ def get_reloc_offsets(slice: Slice, pe: pefile.PE) -> Set[int]: def check_is_xor(xor_key: Union[int, None]): if isinstance(xor_key, int): - return ("#decoded",) + return ("#decoded",), [] - return () + return (), [] def check_is_reloc(reloc_offsets: Set[int], string: ExtractedString): for addr in string.slice.range: if addr in reloc_offsets: - return ("#reloc",) + return ("#reloc",), [] - return () + return (), [] def check_is_code(code_offsets: Set[int], string: ExtractedString): for addr in string.slice.range: if addr in code_offsets: - return ("#code",) + return ("#code",), [] - return () + return (), [] def query_code_string_database(db: StringGlobalPrevalenceDatabase, string: str): if db.query(string): - return ("#code-junk",) + return ("#code-junk",), db.query(string) - return () + return (), [] def query_global_prevalence_database(db: StringGlobalPrevalenceDatabase, string: str): if db.query(string): - return ("#common",) + return ("#common",), db.query(string) - return () + return (), [] def query_global_prevalence_hash_database(db: StringHashDatabase, string: str): if string in db: - return ("#common",) + return ("#common",), [{'hash': str(db.get_hash(string))}] - return () + return (), [] -def query_library_string_database(db: OpenSourceStringDatabase, string: str) -> Sequence[Tag]: +def query_library_string_database(db: OpenSourceStringDatabase, string: str): meta = db.metadata_by_string.get(string) if not meta: - return () + return (), [] - return (f"#{meta.library_name}",) + return (f"#{meta.library_name}",), meta -def query_expert_string_database(db: ExpertStringDatabase, string: str) -> Sequence[Tag]: - return tuple(db.query(string)) +def query_expert_string_database(db: ExpertStringDatabase, string: str): + tag, meta = db.query(string) + return tag, meta -def query_winapi_name_database(db: WindowsApiStringDatabase, string: str) -> Sequence[Tag]: +def query_winapi_name_database(db: WindowsApiStringDatabase, string: str): if string.lower() in db.dll_names: - return ("#winapi",) + return ("#winapi",), [] if string in db.api_names: - return ("#winapi",) + return ("#winapi",), [] - return () + return (), [] -Tagger = Callable[[ExtractedString], Sequence[Tag]] +Tagger = Callable[[ExtractedString], Tuple[Sequence[Tag], MetadataType]] def load_databases() -> Sequence[Tagger]: @@ -557,7 +562,7 @@ def load_databases() -> Sequence[Tagger]: def query_database(db, queryfn, string: ExtractedString): return queryfn(db, string.string) - def make_tagger(db, queryfn) -> Sequence[Tag]: + def make_tagger(db, queryfn) -> Tuple[Sequence[Tag], MetadataType]: return functools.partial(query_database, db, queryfn) for db in floss.qs.db.winapi.get_default_databases(): @@ -728,6 +733,7 @@ def tag_strings(self, taggers: Sequence[Tagger]): # this routine will transform them into TaggedStrings. assert isinstance(string, ExtractedString) tags: Set[Tag] = set() + metas: List[MetadataType] = list() string_counts[string.string] += 1 @@ -735,9 +741,12 @@ def tag_strings(self, taggers: Sequence[Tagger]): tags.add("#duplicate") for tagger in taggers: - tags.update(tagger(string)) + tag, meta = tagger(string) + tags.update(tag) + if meta != [] and meta not in metas: + metas.append(meta) - tagged_strings.append(TaggedString(string, tags)) + tagged_strings.append(TaggedString(string, tags, meta=metas)) self.strings = tagged_strings for child in self.children: @@ -1144,11 +1153,13 @@ class QSJSONEncoder(json.JSONEncoder): """ def default(self, o): + if isinstance(o, set): + return list(o) + if isinstance(o, MetadataType): + return json.loads(msgspec.json.encode(o)) if dataclasses.is_dataclass(o): if isinstance(o, Slice): return o.range - elif isinstance(o, set): - return list(o) else: return dataclasses.asdict(o)