Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Custom UTF-8 Decoder #885

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
112 changes: 112 additions & 0 deletions floss/language/rust/decode_utf8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
import sys
import logging
import pathlib
import argparse
from typing import List, Tuple, Iterable, Optional

import pefile

MIN_STR_LEN = 4

logger = logging.getLogger(__name__)


def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure:
for section in pe.sections:
if section.Name.startswith(b".rdata\x00"):
return section

raise ValueError("no .rdata section found")
Arker123 marked this conversation as resolved.
Show resolved Hide resolved


def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a function that's signature is analog to extract_ascii_strings, i.e. operate on buffers?
this can then be wrapped by this or a similar function.

"""
Extracts UTF-8 strings from the .rdata section of a PE file.
"""
try:
rdata_section = get_rdata_section(pe)
except ValueError as e:
print("cannot extract rust strings: %s", e)
return []

strings = rdata_section.get_data()

character_and_index = []

# Reference: https://en.wikipedia.org/wiki/UTF-8

for i in range(0, len(strings)):
# for 1 byte
if strings[i] & 0x80 == 0x00:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add tests for various encoded strings to show they're properly decoded

character = strings[i].to_bytes(1, "big").decode("utf-8", "ignore")
character_and_index.append([character, i, 1])
Arker123 marked this conversation as resolved.
Show resolved Hide resolved

# for 2 bytes
elif strings[i] & 0xE0 == 0xC0:
temp = strings[i] << 8 | strings[i + 1]
character = temp.to_bytes(2, "big").decode("utf-8", "ignore")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use ignore here? wouldn't we want to handle the case that invalid UTF-8 data is encountered (and not extract a string there)?

I assume that your algorithm works pretty well, since you've opened the PR, but I can't quite follow how it works. Would you please add some comments explaining the design, and definitely a few test cases that exercise each of the branch arms?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the tests for each branch are in tests/test_utf8_decoder.py. Let me know if anything else is required.

i += 1
character_and_index.append([character, i, 2])

# for 3 bytes
elif strings[i] & 0xF0 == 0xE0:
temp = strings[i] << 16 | strings[i + 1] << 8 | strings[i + 2]
character = temp.to_bytes(3, "big").decode("utf-8", "ignore")
i += 2
character_and_index.append([character, i, 3])

# for 4 bytes
elif strings[i] & 0xF8 == 0xF0:
temp = strings[i] << 24 | strings[i + 1] << 16 | strings[i + 2] << 8 | strings[i + 3]
character = temp.to_bytes(4, "big").decode("utf-8", "ignore")
i += 3
character_and_index.append([character, i, 4])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the other cases? are there any?

either way, please add an else branch to handle them, either logging or doing an assertion.


strings = [] # string, start index, end index

prev = False

for i in range(0, len(character_and_index)):
if character_and_index[i][0].isprintable() == True:
if prev == False:
strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]])
prev = True
else:
strings[-1][0] += character_and_index[i][0]
strings[-1][2] = character_and_index[i][1]
else:
prev = False

# filter strings less than min length
strings = [string for string in strings if len(string[0]) >= min_length]

return strings


def main(argv=None):
parser = argparse.ArgumentParser(description="Get Rust strings")
parser.add_argument("path", help="file or path to analyze")
parser.add_argument(
"-n",
"--minimum-length",
dest="min_length",
type=int,
default=MIN_STR_LEN,
help="minimum string length",
)
args = parser.parse_args(args=argv)

logging.basicConfig(level=logging.DEBUG)

pe = pathlib.Path(args.path)
buf = pe.read_bytes()
pe = pefile.PE(data=buf, fast_load=True)

strings = extract_utf8_strings(pe, args.min_length)
for string in strings:
print(string[0])


if __name__ == "__main__":
sys.exit(main())
16 changes: 6 additions & 10 deletions floss/language/rust/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from floss.results import StaticString, StringEncoding
from floss.language.utils import find_lea_xrefs, find_mov_xrefs, find_push_xrefs, get_struct_string_candidates
from floss.language.rust.decode_utf8 import extract_utf8_strings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,18 +62,14 @@ def fix_b2s_wide_strings(


def filter_and_transform_utf8_strings(
strings: List[Tuple[str, str, Tuple[int, int], bool]],
strings: List[Tuple[str, int, int]],
start_rdata: int,
) -> List[StaticString]:
transformed_strings = []

for string in strings:
s = string[0]
string_type = string[1]
start = string[2][0] + start_rdata

if string_type != "UTF8":
continue
start = string[1] + start_rdata

# our static algorithm does not extract new lines either
s = s.replace("\n", "")
Expand Down Expand Up @@ -138,12 +135,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
pointer_to_raw_data = rdata_section.PointerToRawData
buffer_rdata = rdata_section.get_data()

# extract utf-8 and wide strings, latter not needed here
strings = b2s.extract_all_strings(buffer_rdata, min_length)
fixed_strings = fix_b2s_wide_strings(strings, min_length, buffer_rdata)
# extract utf-8 strings
strings = extract_utf8_strings(pe, min_length)

# select only UTF-8 strings and adjust offset
static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata)
static_strings = filter_and_transform_utf8_strings(strings, start_rdata)

struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe))

Expand Down
4 changes: 2 additions & 2 deletions tests/test_language_rust_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ def test_language_detection_64(binary_file):
with contextlib.redirect_stdout(None):
out = get_extract_stats(pe, all_ss_strings, rust_strings, n)

# check that the output percentage is greater than 88%
assert float(out) > 88
# check that the output percentage is greater than 86%
assert float(out) > 86 # increase to 91 after merging PR #899
Arker123 marked this conversation as resolved.
Show resolved Hide resolved