Skip to content

Commit

Permalink
refactor(yaffs): use FileSystem to report problems during extraction
Browse files Browse the repository at this point in the history
In the newer version absolute symlinks are no longer dropped as path
traversals, but changed to point within the extraction directory.
  • Loading branch information
e3krisztian authored and qkaiser committed Aug 11, 2023
1 parent 75eae88 commit 01990c9
Show file tree
Hide file tree
Showing 75 changed files with 91 additions and 58 deletions.
75 changes: 17 additions & 58 deletions unblob/handlers/filesystem/yaffs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import itertools
import os
from collections import defaultdict
from enum import IntEnum
from pathlib import Path
Expand All @@ -12,17 +11,17 @@
from treelib import Tree
from treelib.exceptions import NodeIDAbsentError

from unblob.extractor import is_safe_path
from unblob.file_utils import (
Endian,
File,
FileSystem,
InvalidInputFormat,
StructParser,
get_endian_multi,
read_until_past,
snull,
)
from unblob.models import Extractor, Handler, HexString, ValidChunk
from unblob.models import Extractor, ExtractResult, Handler, HexString, ValidChunk

logger = get_logger()

Expand Down Expand Up @@ -470,84 +469,42 @@ def resolve_path(self, entry: YAFFSEntry) -> Path:
return self.resolve_path(parent_entry).joinpath(resolved_path)
return resolved_path

def get_file_bytes(self, entry: YAFFSEntry) -> Iterable[bytes]:
def get_file_chunks(self, entry: YAFFSEntry) -> Iterable[bytes]:
for chunk in self.get_chunks(entry.object_id):
yield self.file[chunk.offset : chunk.offset + chunk.byte_count]

def extract(self, outdir: Path):
def extract(self, fs: FileSystem):
for entry in [
self.file_entries.get_node(node)
for node in self.file_entries.expand_tree(mode=Tree.DEPTH)
]:
if entry is None or entry.data is None:
continue
self.extract_entry(entry.data, outdir)
self.extract_entry(entry.data, fs)

def extract_entry(self, entry: YAFFSEntry, outdir: Path): # noqa: C901
def extract_entry(self, entry: YAFFSEntry, fs: FileSystem):
if entry.object_type == YaffsObjectType.UNKNOWN:
logger.warning("unknown type entry", entry=entry)
logger.warning("unknown entry type", entry=entry)
return

entry_path = self.resolve_path(entry)

if not is_safe_path(outdir, entry_path):
logger.warning(
"Potential path traversal attempt", outdir=outdir, path=entry_path
)
return

out_path = outdir.joinpath(entry_path)
out_path = self.resolve_path(entry)

if entry.object_type == YaffsObjectType.SPECIAL:
if not isinstance(entry, YAFFS2Entry):
logger.warning("non YAFFS2 special object", entry=entry)
return

if os.geteuid() == 0:
logger.debug(
"creating special file", special_path=out_path, _verbosity=3
)
os.mknod(out_path.as_posix(), entry.st_mode, entry.st_rdev)
else:
logger.warning(
"creating special files requires elevated privileges, skipping.",
path=out_path,
st_mode=entry.st_mode,
st_rdev=entry.st_rdev,
)
return

if entry.object_type == YaffsObjectType.DIRECTORY:
logger.debug("creating directory", dir_path=out_path, _verbosity=3)
out_path.mkdir(exist_ok=True)
fs.mknod(out_path, entry.st_mode, entry.st_rdev)
elif entry.object_type == YaffsObjectType.DIRECTORY:
fs.mkdir(out_path, exist_ok=True)
elif entry.object_type == YaffsObjectType.FILE:
logger.debug("creating file", file_path=out_path, _verbosity=3)
with out_path.open("wb") as f:
for chunk in self.get_file_bytes(entry):
f.write(chunk)
fs.write_chunks(out_path, self.get_file_chunks(entry))
elif entry.object_type == YaffsObjectType.SYMLINK:
if not is_safe_path(outdir, out_path.parent / Path(entry.alias)):
logger.warning(
"Potential path traversal attempt through symlink",
outdir=outdir,
path=entry.alias,
)
return
logger.debug("creating symlink", file_path=out_path, _verbosity=3)
out_path.symlink_to(Path(entry.alias))
fs.create_symlink(src=Path(entry.alias), dst=out_path)
elif entry.object_type == YaffsObjectType.HARDLINK:
logger.debug("creating hardlink", file_path=out_path, _verbosity=3)
dst_entry = self.file_entries[entry.equiv_id].data
dst_path = self.resolve_path(dst_entry)
if not is_safe_path(outdir, dst_path):
logger.warning(
"Potential path traversal attempt through hardlink",
outdir=outdir,
path=dst_path,
)
return
dst_full_path = outdir / dst_path
dst_full_path.link_to(out_path)
fs.create_hardlink(src=dst_path, dst=out_path)


class YAFFS2Parser(YAFFSParser):
Expand Down Expand Up @@ -765,7 +722,9 @@ def extract(self, inpath: Path, outdir: Path):
infile = File.from_path(inpath)
parser = instantiate_parser(infile)
parser.parse(store=True)
parser.extract(outdir)
fs = FileSystem(outdir)
parser.extract(fs)
return ExtractResult(reports=list(fs.problems))


class YAFFSHandler(Handler):
Expand Down

0 comments on commit 01990c9

Please sign in to comment.