Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Custom UTF-8 Decoder #885

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
131 changes: 131 additions & 0 deletions floss/language/rust/decode_utf8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
import sys
import pathlib
import argparse
from typing import Any, List, Tuple, Iterable, Optional
from collections import namedtuple

import pefile

import floss.logging_
from floss.language.utils import get_rdata_section

MIN_STR_LEN = 4

logger = floss.logging_.getLogger(__name__)


def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]:
"""
Extracts UTF-8 strings from a buffer.
"""

# Reference: https://en.wikipedia.org/wiki/UTF-8

character_info = namedtuple("character_info", ["character", "position", "length"])
character_and_index = []

for i in range(0, len(buf)):
# for 1 byte
if buf[i] & 0x80 == 0x00:
# ignore is used below because decode function throws an exception
# when there is an character where the if condition is satisfied but it is not a valid utf-8 character
character = buf[i].to_bytes(1, "big").decode("utf-8", "ignore")
character_and_index.append(character_info(character, i, 1))

# for 2 bytes
elif buf[i] & 0xE0 == 0xC0:
temp = buf[i] << 8 | buf[i + 1]
character = temp.to_bytes(2, "big").decode("utf-8", "ignore")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you use ignore here? wouldn't we want to handle the case that invalid UTF-8 data is encountered (and not extract a string there)?

I assume that your algorithm works pretty well, since you've opened the PR, but I can't quite follow how it works. Would you please add some comments explaining the design, and definitely a few test cases that exercise each of the branch arms?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the tests for each branch are in tests/test_utf8_decoder.py. Let me know if anything else is required.

i += 1
character_and_index.append(character_info(character, i, 2))

# for 3 bytes
elif buf[i] & 0xF0 == 0xE0:
temp = buf[i] << 16 | buf[i + 1] << 8 | buf[i + 2]
character = temp.to_bytes(3, "big").decode("utf-8", "ignore")
i += 2
character_and_index.append(character_info(character, i, 3))

# for 4 bytes
elif buf[i] & 0xF8 == 0xF0:
temp = buf[i] << 24 | buf[i + 1] << 16 | buf[i + 2] << 8 | buf[i + 3]
character = temp.to_bytes(4, "big").decode("utf-8", "ignore")
i += 3
character_and_index.append(character_info(character, i, 4))

else:
logger.trace("Invalid UTF-8 character at offset %d", i)

prev = False
strings = []

for i in range(0, len(character_and_index)):
if character_and_index[i].character.isprintable():
if prev == False:
strings.append(
[character_and_index[i].character, character_and_index[i].position, character_and_index[i].position]
)
prev = True
else:
strings[-1][0] += character_and_index[i].character
strings[-1][2] = character_and_index[i].position
else:
prev = False

# filter strings less than min length
strings = [string for string in strings if len(string[0]) >= min_length]

return strings


def extract_rdata_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]:
"""
Extracts UTF-8 strings from the .rdata section of a PE file.
"""
try:
rdata_section = get_rdata_section(pe)
except ValueError as e:
logger.error("cannot extract rust strings: %s", e)
return []

buf = pe.get_memory_mapped_image()[
rdata_section.VirtualAddress : rdata_section.VirtualAddress + rdata_section.SizeOfRawData
]
strings = extract_utf8_strings_from_buffer(buf, min_length)
return strings


def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]:
"""
Extracts UTF-8 strings from a PE file.
"""
# Can be extended to extract strings from other sections
return extract_rdata_utf8_strings(pe, min_length)


def main(argv=None):
parser = argparse.ArgumentParser(description="Get Rust strings")
parser.add_argument("path", help="file or path to analyze")
parser.add_argument(
"-n",
"--minimum-length",
dest="min_length",
type=int,
default=MIN_STR_LEN,
help="minimum string length",
)
args = parser.parse_args(args=argv)

pe = pathlib.Path(args.path)
buf = pe.read_bytes()
pe = pefile.PE(data=buf, fast_load=True)

strings = extract_utf8_strings(pe, args.min_length)
print(strings)
for string in strings:
print(string[0])


if __name__ == "__main__":
sys.exit(main())
21 changes: 6 additions & 15 deletions floss/language/rust/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib
import argparse
import itertools
from typing import List, Tuple, Iterable, Optional
from typing import Any, List, Tuple, Iterable, Optional

import pefile
import binary2strings as b2s
Expand All @@ -17,6 +17,7 @@
get_rdata_section,
get_struct_string_candidates,
)
from floss.language.rust.decode_utf8 import extract_utf8_strings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,18 +60,14 @@ def fix_b2s_wide_strings(


def filter_and_transform_utf8_strings(
strings: List[Tuple[str, str, Tuple[int, int], bool]],
strings: List[List[Any]],
start_rdata: int,
) -> List[StaticString]:
transformed_strings = []

for string in strings:
s = string[0]
string_type = string[1]
start = string[2][0] + start_rdata

if string_type != "UTF8":
continue
start = string[1] + start_rdata

# our static algorithm does not extract new lines either
s = s.replace("\n", "")
Expand Down Expand Up @@ -150,18 +147,12 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
pointer_to_raw_data = rdata_section.PointerToRawData
buffer_rdata = rdata_section.get_data()

# extract utf-8 and wide strings, latter not needed here
strings = b2s.extract_all_strings(buffer_rdata, min_length)
fixed_strings = fix_b2s_wide_strings(strings, min_length, buffer_rdata)
# extract utf-8 strings
fixed_strings = extract_utf8_strings(pe, min_length)

# select only UTF-8 strings and adjust offset
static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata)

# TODO(mr-tz) - handle miss in rust-hello64.exe
# .rdata:00000001400C1270 0A aPanickedAfterP db 0Ah ; DATA XREF: .rdata:00000001400C12B8↓o
# .rdata:00000001400C1271 70 61 6E 69 63 6B 65 64… db 'panicked after panic::always_abort(), aborting.',0Ah,0
# .rdata:00000001400C12A2 00 00 00 00 00 00 align 8

struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe))

if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_language_rust_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ def test_language_detection_64(binary_file):
with contextlib.redirect_stdout(None):
out = get_extract_stats(pe, all_ss_strings, rust_strings, n)

# check that the output percentage is greater than 88%
assert float(out) > 88
# check that the output percentage is greater than 86%
assert float(out) > 86 # TODO(Arker123): increase to 91 after merging PR #899
30 changes: 30 additions & 0 deletions tests/test_utf8_decoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pathlib

import pytest

from floss.results import StaticString, StringEncoding
from floss.language.rust.extract import extract_rust_strings


@pytest.fixture(scope="module")
def rust_strings64():
n = 1
path = pathlib.Path(__file__).parent / "data" / "language" / "rust" / "rust-hello" / "bin" / "rust-hello64.exe"
return extract_rust_strings(path, n)


@pytest.mark.parametrize(
"string,offset,encoding,rust_strings",
[
# For 1 character strings
pytest.param("Hello, world!", 0xBB030, StringEncoding.UTF8, "rust_strings64"),
# For 2 character strings
pytest.param("۶ж̶ƶ", 0xC73E3, StringEncoding.UTF8, "rust_strings64"),
# For 3 character strings
pytest.param("jd8n8n헧??", 0xD3CE2, StringEncoding.UTF8, "rust_strings64"),
# For 4 character strings
pytest.param("&ޓޓttt", 0xD41F8, StringEncoding.UTF8, "rust_strings64"),
],
)
def test_utf8_decoder(request, string, offset, encoding, rust_strings):
assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings)
Loading