diff --git a/UnityPy/streams/EndianBinaryReader.py b/UnityPy/streams/EndianBinaryReader.py index 53c96a50..a8b38862 100644 --- a/UnityPy/streams/EndianBinaryReader.py +++ b/UnityPy/streams/EndianBinaryReader.py @@ -2,29 +2,15 @@ import re from abc import ABC, ABCMeta, abstractmethod -from io import BufferedIOBase, BufferedReader, BytesIO, IOBase +from io import BytesIO, IOBase from struct import Struct, unpack -from typing import List, Optional, Union +from typing import BinaryIO, List, Optional, Union from ._defines import TYPE_PARAM_SIZE_LIST, Endianess reNot0 = re.compile(b"(.*?)\x00", re.S) -from ..math import Color, Matrix4x4, Quaternion, Rectangle, Vector2, Vector3, Vector4 - -# generate unpack and unpack_from functions - - -LOCALS = locals() -for endian_s, endian_l in (("<", "little"), (">", "big")): - for typ, param, _ in TYPE_PARAM_SIZE_LIST: - LOCALS[f"unpack_{endian_l}_{typ}"] = Struct(f"{endian_s}{param}").unpack - LOCALS[f"unpack_{endian_l}_{typ}_from"] = Struct( - f"{endian_s}{param}" - ).unpack_from - - class EndianBinaryReader(ABC, metaclass=ABCMeta): endian: Endianess Length: int @@ -33,35 +19,56 @@ class EndianBinaryReader(ABC, metaclass=ABCMeta): def __new__( cls, - item: Union[bytes, bytearray, memoryview, BytesIO, str], + item: Union[bytes, bytearray, memoryview, BytesIO, str, IOBase], endian: Endianess = ">", offset: int = 0, ): + """ + Creates a new instance of EndianBinaryReader, choosing the appropriate subclass based on the type of the input. + + Args: + item: The data source (bytes, memoryview, file path, or stream). + endian: Endianness to use for reading data ('>' for big-endian, '<' for little-endian). + offset: Initial offset to set for reading. + + Returns: + An instance of either EndianBinaryReader_Memoryview or EndianBinaryReader_Streamable. + + Raises: + ValueError: If the provided `item` type is unsupported. + """ if isinstance(item, (bytes, bytearray, memoryview)): - obj = super(EndianBinaryReader, cls).__new__(EndianBinaryReader_Memoryview) - elif isinstance(item, (IOBase, BufferedIOBase)): - obj = super(EndianBinaryReader, cls).__new__(EndianBinaryReader_Streamable) + # Handle in-memory binary data + obj = super().__new__(EndianBinaryReader_Memoryview) + elif isinstance(item, IOBase): # Includes BufferedIOBase + # Handle stream-like objects + obj = super().__new__(EndianBinaryReader_Streamable) elif isinstance(item, str): - item = open(item, "rb") - obj = super(EndianBinaryReader, cls).__new__(EndianBinaryReader_Streamable) + # Handle file paths as input + try: + item = open(item, "rb") + obj = super().__new__(EndianBinaryReader_Streamable) + except FileNotFoundError as e: + raise ValueError(f"File not found: {item}") from e elif isinstance(item, EndianBinaryReader): - item = ( + # Wrap another EndianBinaryReader instance + new_item = ( item.stream if isinstance(item, EndianBinaryReader_Streamable) else item.view ) - return EndianBinaryReader(item, endian, offset) + return cls(new_item, endian, offset) elif hasattr(item, "read"): - if hasattr(item, "seek") and hasattr(item, "tell"): - obj = super(EndianBinaryReader, cls).__new__( - EndianBinaryReader_Streamable - ) + # Handle generic objects with a `read` method + if hasattr(item, "seek"): + obj = super().__new__(EndianBinaryReader_Streamable) else: item = item.read() - obj = super(EndianBinaryReader, cls).__new__( - EndianBinaryReader_Memoryview - ) + obj = super().__new__(EndianBinaryReader_Memoryview) + else: + raise ValueError(f"Unsupported input type: {type(item).__name__}") + # Initialize the chosen subclass obj.__init__(item, endian) return obj @@ -70,7 +77,7 @@ def __init__(self, item, endian=">", offset=0): self.BaseOffset = offset self.Position = 0 - def seek(self, offset: int, whence: int = 0): + def seek(self, offset: int, whence: int = 0) -> int: if whence == 0: self.Position = offset elif whence == 1: @@ -79,6 +86,7 @@ def seek(self, offset: int, whence: int = 0): self.Position = self.Length + offset else: raise ValueError("Invalid whence") + return self.Position def tell(self) -> int: return self.Position @@ -92,12 +100,10 @@ def create_sub_reader(self, offset: int, length: int) -> EndianBinaryReader: @property def bytes(self): - # implemented by Streamable and Memoryview versions - return b"" + raise NotImplementedError(f"{self.__class__.__name__}.read not implemented!") def read(self, *args): - # implemented by Streamable and Memoryview versions - return b"" + raise NotImplementedError(f"{self.__class__.__name__}.read not implemented!") def read_byte(self) -> int: return unpack(self.endian + "b", self.read(1))[0] @@ -160,47 +166,11 @@ def read_aligned_string(self) -> str: def align_stream(self, alignment=4): self.Position += (alignment - self.Position % alignment) % alignment - - def read_quaternion(self) -> Quaternion: - return Quaternion( - self.read_float(), self.read_float(), self.read_float(), self.read_float() - ) - - def read_vector2(self) -> Vector2: - return Vector2(self.read_float(), self.read_float()) - - def read_vector3(self) -> Vector3: - return Vector3(self.read_float(), self.read_float(), self.read_float()) - - def read_vector4(self) -> Vector4: - return Vector4( - self.read_float(), self.read_float(), self.read_float(), self.read_float() - ) - - def read_rectangle_f(self) -> Rectangle: - return Rectangle( - self.read_float(), self.read_float(), self.read_float(), self.read_float() - ) - - def read_color_uint(self): - r = self.read_u_byte() - g = self.read_u_byte() - b = self.read_u_byte() - a = self.read_u_byte() - - return Color(r / 255.0, g / 255.0, b / 255.0, a / 255.0) - - def read_color4(self) -> Color: - return Color( - self.read_float(), self.read_float(), self.read_float(), self.read_float() - ) + return self.Position def read_byte_array(self) -> bytes: return self.read(self.read_int()) - def read_matrix(self) -> Matrix4x4: - return Matrix4x4(self.read_float_array(16)) - def read_array(self, command, length: int) -> list: return [command() for _ in range(length)] @@ -234,29 +204,12 @@ def read_long_array(self, length: int = None) -> List[int]: def read_u_long_array(self, length: int = None) -> List[int]: return self.read_array_struct("Q", length) - def read_u_int_array_array(self, length: int = None) -> List[List[int]]: - return self.read_array( - self.read_u_int_array, length if length is not None else self.read_int() - ) - def read_float_array(self, length: int = None) -> List[float]: return self.read_array_struct("f", length) def read_double_array(self, length: int = None) -> List[float]: return self.read_array_struct("d", length) - def read_string_array(self) -> List[str]: - return self.read_array(self.read_aligned_string, self.read_int()) - - def read_vector2_array(self) -> List[Vector2]: - return self.read_array(self.read_vector2, self.read_int()) - - def read_vector4_array(self) -> List[Vector4]: - return self.read_array(self.read_vector4, self.read_int()) - - def read_matrix_array(self) -> List[Matrix4x4]: - return self.read_array(self.read_matrix, self.read_int()) - def real_offset(self) -> int: """Returns offset in the underlying file. (Not working with unpacked streams.) @@ -287,17 +240,14 @@ def endian(self): return self._endian @endian.setter - def endian(self, value: str): + def endian(self, value): if value not in ("<", ">"): raise ValueError("Invalid endian") if value != self._endian: - setattr( - self, - "__class__", - EndianBinaryReader_Memoryview_LittleEndian - if value == "<" - else EndianBinaryReader_Memoryview_BigEndian, - ) + for typ, _, _ in TYPE_PARAM_SIZE_LIST: + func_name_e = f"read_{value}_{typ}" + func_name = f"read_{typ}" + setattr(self, func_name, getattr(self, func_name_e)) self._endian = value @property @@ -342,133 +292,9 @@ def create_sub_reader(self, offset: int, length: int) -> EndianBinaryReader: ) -class EndianBinaryReader_Memoryview_LittleEndian(EndianBinaryReader_Memoryview): - def read_u_short(self): - (ret,) = unpack_little_u_short_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_short(self): - (ret,) = unpack_little_short_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_int(self): - (ret,) = unpack_little_int_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_u_int(self): - (ret,) = unpack_little_u_int_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_long(self): - (ret,) = unpack_little_long_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_u_long(self): - (ret,) = unpack_little_u_long_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_half(self): - (ret,) = unpack_little_half_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_float(self): - (ret,) = unpack_little_float_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_double(self): - (ret,) = unpack_little_double_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_vector2(self): - (x, y) = unpack_little_vector2_from(self.view, self.Position) - self.Position += 8 - return Vector2(x, y) - - def read_vector3(self): - (x, y, z) = unpack_little_vector3_from(self.view, self.Position) - self.Position += 12 - return Vector3(x, y, z) - - def read_vector4(self): - (x, y, z, w) = unpack_little_vector4_from(self.view, self.Position) - self.Position += 16 - return Vector4(x, y, z, w) - - -class EndianBinaryReader_Memoryview_BigEndian(EndianBinaryReader_Memoryview): - def read_u_short(self): - (ret,) = unpack_big_u_short_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_short(self): - (ret,) = unpack_big_short_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_int(self): - (ret,) = unpack_big_int_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_u_int(self): - (ret,) = unpack_big_u_int_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_long(self): - (ret,) = unpack_big_long_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_u_long(self): - (ret,) = unpack_big_u_long_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_half(self): - (ret,) = unpack_big_half_from(self.view, self.Position) - self.Position += 2 - return ret - - def read_float(self): - (ret,) = unpack_big_float_from(self.view, self.Position) - self.Position += 4 - return ret - - def read_double(self): - (ret,) = unpack_big_double_from(self.view, self.Position) - self.Position += 8 - return ret - - def read_vector2(self): - (x, y) = unpack_big_vector2_from(self.view, self.Position) - self.Position += 8 - return Vector2(x, y) - - def read_vector3(self): - (x, y, z) = unpack_big_vector3_from(self.view, self.Position) - self.Position += 12 - return Vector3(x, y, z) - - def read_vector4(self): - (x, y, z, w) = unpack_big_vector4_from(self.view, self.Position) - self.Position += 16 - return Vector4(x, y, z, w) - - class EndianBinaryReader_Streamable(EndianBinaryReader): __slots__ = ("stream", "_endian", "BaseOffset") - stream: BufferedReader + stream: BinaryIO def __init__(self, stream, endian=">", offset=0): self._endian = "" @@ -491,13 +317,10 @@ def endian(self, value): if value not in ("<", ">"): raise ValueError("Invalid endian") if value != self._endian: - setattr( - self, - "__class__", - EndianBinaryReader_Streamable_LittleEndian - if value == "<" - else EndianBinaryReader_Streamable_BigEndian, - ) + for typ, _, _ in TYPE_PARAM_SIZE_LIST: + func_name_e = f"read_{value}_{typ}" + func_name = f"read_{typ}" + setattr(self, func_name, getattr(self, func_name_e)) self._endian = value @property @@ -525,77 +348,23 @@ def create_sub_reader(self, offset: int, length: int) -> EndianBinaryReader: return EndianBinaryReader_Streamable(self.stream, self.endian, offset) -class EndianBinaryReader_Streamable_LittleEndian(EndianBinaryReader_Streamable): - def read_u_short(self): - return unpack_little_u_short(self.read(2))[0] - - def read_short(self): - return unpack_little_short(self.read(2))[0] - - def read_int(self): - return unpack_little_int(self.read(4))[0] - - def read_u_int(self): - return unpack_little_u_int(self.read(4))[0] - - def read_long(self): - return unpack_little_long(self.read(8))[0] - - def read_u_long(self): - return unpack_little_u_long(self.read(8))[0] - - def read_half(self): - return unpack_little_half(self.read(2))[0] - - def read_float(self): - return unpack_little_float(self.read(4))[0] - - def read_double(self): - return unpack_little_double(self.read(8))[0] - - def read_vector2(self): - return Vector2(*unpack_little_vector2(self.read(8))) - - def read_vector3(self): - return Vector3(*unpack_little_vector3(self.read(12))) - - def read_vector4(self): - return Vector4(*unpack_little_vector4(self.read(16))) - - -class EndianBinaryReader_Streamable_BigEndian(EndianBinaryReader_Streamable): - def read_u_short(self): - return unpack_big_u_short(self.read(2))[0] - - def read_short(self): - return unpack_big_short(self.read(2))[0] - - def read_int(self): - return unpack_big_int(self.read(4))[0] - - def read_u_int(self): - return unpack_big_u_int(self.read(4))[0] - - def read_long(self): - return unpack_big_long(self.read(8))[0] - - def read_u_long(self): - return unpack_big_u_long(self.read(8))[0] - - def read_half(self): - return unpack_big_half(self.read(2))[0] +# generate endianed functions +for endian_s in Endianess.__args__: + for typ, param, _ in TYPE_PARAM_SIZE_LIST: - def read_float(self): - return unpack_big_float(self.read(4))[0] + def generate_funcs(): + func_name = f"read_{endian_s}_{typ}" + struct = Struct(f"{endian_s}{param}") - def read_double(self): - return unpack_big_double(self.read(8))[0] + def mv_func(self: EndianBinaryReader_Memoryview): + value = struct.unpack_from(self.view, self.Position)[0] + self.Position += struct.size + return value - def read_vector2(self): - return Vector2(*unpack_big_vector2(self.read(8))) + def st_func(self: EndianBinaryReader_Streamable): + return struct.unpack(self.stream)[0] - def read_vector3(self): - return Vector3(*unpack_big_vector3(self.read(12))) + setattr(EndianBinaryReader_Memoryview, func_name, mv_func) + setattr(EndianBinaryReader_Streamable, func_name, st_func) - def read_vector4(self): - return Vector4(*unpack_big_vector4(self.read(16))) + generate_funcs() diff --git a/UnityPy/streams/EndianBinaryReader.pyi b/UnityPy/streams/EndianBinaryReader.pyi new file mode 100644 index 00000000..af756b7f --- /dev/null +++ b/UnityPy/streams/EndianBinaryReader.pyi @@ -0,0 +1,63 @@ +from __future__ import annotations +from abc import ABC, ABCMeta, abstractmethod +from typing import List, Optional, BinaryIO + +from ._defines import Endianess + +class EndianBinaryReader(ABC, BinaryIO, metaclass=ABCMeta): + endian: Endianess + Length: int + Position: int + BaseOffset: int + + def __init__(self, item, endian=">", offset=0): ... + def seek(self, offset: int, whence: int = 0) -> int: ... + def tell(self) -> int: ... + def get_bytes(self) -> bytes: ... + @abstractmethod + def create_sub_reader(self, offset: int, length: int) -> EndianBinaryReader: + pass + + @property + def bytes(self) -> bytes: ... + def read(self, *args) -> bytes: ... + def read_byte(self) -> int: ... + def read_u_byte(self) -> int: ... + def read_bytes(self, num: int) -> bytes: ... + def read_short(self) -> int: ... + def read_int(self) -> int: ... + def read_long(self) -> int: ... + def read_u_short(self) -> int: ... + def read_u_int(self) -> int: ... + def read_u_long(self) -> int: ... + def read_float(self) -> float: ... + def read_double(self) -> float: ... + def read_boolean(self) -> bool: ... + def read_string_to_null(self, max_length=32767) -> str: ... + def read_string( + self, size: Optional[int] = None, encoding: str = "utf8" + ) -> str: ... + def read_aligned_string(self) -> str: ... + def align_stream(self, alignment=4) -> int: ... + def read_byte_array(self) -> bytes: ... + def read_array(self, command, length: int) -> list: ... + def read_array_struct(self, param: str, length: int = None) -> List[tuple]: ... + def read_boolean_array(self, length: int = None) -> List[bool]: ... + def read_u_byte_array(self, length: int = None) -> List[int]: ... + def read_u_short_array(self, length: int = None) -> List[int]: ... + def read_short_array(self, length: int = None) -> List[int]: ... + def read_int_array(self, length: int = None) -> List[int]: ... + def read_u_int_array(self, length: int = None) -> List[int]: ... + def read_long_array(self, length: int = None) -> List[int]: ... + def read_u_long_array(self, length: int = None) -> List[int]: ... + def read_float_array(self, length: int = None) -> List[float]: ... + def read_double_array(self, length: int = None) -> List[float]: ... + def real_offset(self) -> int: ... + def read_the_rest(self, obj_start: int, obj_size: int) -> bytes: ... + def unpack_array(self, string: str, count: int) -> list: ... + +class EndianBinaryReader_Memoryview(EndianBinaryReader): + view: memoryview + +class EndianBinaryReader_Streamable(EndianBinaryReader): + stream: BinaryIO diff --git a/UnityPy/streams/EndianBinaryWriter.py b/UnityPy/streams/EndianBinaryWriter.py index 403696ab..923638a5 100644 --- a/UnityPy/streams/EndianBinaryWriter.py +++ b/UnityPy/streams/EndianBinaryWriter.py @@ -1,6 +1,6 @@ -import io +from io import BytesIO from struct import pack -from typing import Union +from typing import BinaryIO, Union from ..math import Color, Matrix4x4, Quaternion, Rectangle, Vector2, Vector3, Vector4 from ._defines import Endianess @@ -10,15 +10,15 @@ class EndianBinaryWriter: endian: Endianess Length: int Position: int - stream: io.BufferedReader + stream: BinaryIO def __init__( - self, input_: Union[bytes, bytearray, io.IOBase] = b"", endian: Endianess = ">" + self, input_: Union[bytes, bytearray, BinaryIO] = b"", endian: Endianess = ">" ): if isinstance(input_, (bytes, bytearray)): - self.stream = io.BytesIO(input_) + self.stream = BytesIO(input_) self.stream.seek(0, 2) - elif isinstance(input_, io.IOBase): + elif isinstance(input_, BinaryIO): self.stream = input_ else: raise ValueError("Invalid input type - %s." % type(input_)) @@ -50,9 +50,9 @@ def bytes(self): def Length(self) -> int: pos = self.stream.tell() self.stream.seek(0, 2) - l = self.stream.tell() + length = self.stream.tell() self.stream.seek(pos) - return l + return length def dispose(self): self.stream.close() @@ -119,49 +119,6 @@ def align_stream(self, alignment=4): align = (alignment - pos % alignment) % alignment self.write(b"\0" * align) - def write_quaternion(self, value: Quaternion): - self.write_float(value.X) - self.write_float(value.Y) - self.write_float(value.Z) - self.write_float(value.W) - - def write_vector2(self, value: Vector2): - self.write_float(value.X) - self.write_float(value.Y) - - def write_vector3(self, value: Vector3): - self.write_float(value.X) - self.write_float(value.Y) - self.write_float(value.Z) - - def write_vector4(self, value: Vector4): - self.write_float(value.X) - self.write_float(value.Y) - self.write_float(value.Z) - self.write_float(value.W) - - def write_rectangle_f(self, value: Rectangle): - self.write_float(value.x) - self.write_float(value.y) - self.write_float(value.width) - self.write_float(value.height) - - def write_color_uint(self, value: Color): - self.write_u_byte(value.R * 255) - self.write_u_byte(value.G * 255) - self.write_u_byte(value.B * 255) - self.write_u_byte(value.A * 255) - - def write_color4(self, value: Color): - self.write_float(value.R) - self.write_float(value.G) - self.write_float(value.B) - self.write_float(value.A) - - def write_matrix(self, value: Matrix4x4): - for val in value.M: - self.write_float(val) - def write_array(self, command, value: list, write_length: bool = True): if write_length: self.write_int(len(value)) @@ -186,15 +143,3 @@ def write_u_int_array(self, value: list, write_length: bool = False): def write_float_array(self, value: list, write_length: bool = False): return self.write_array(self.write_float, value, write_length) - - def write_string_array(self, value: list): - self.write_array(self.write_aligned_string, value) - - def write_vector2_array(self, value: list): - self.write_array(self.write_vector2, value) - - def write_vector4_array(self, value: list): - self.write_array(self.write_vector4, value) - - def write_matrix_array(self, value: list): - self.write_array(self.write_matrix, value) diff --git a/UnityPy/streams/_defines.py b/UnityPy/streams/_defines.py index cf6f8d1d..0a2dd828 100644 --- a/UnityPy/streams/_defines.py +++ b/UnityPy/streams/_defines.py @@ -16,7 +16,4 @@ ("half", "e", 2), ("float", "f", 4), ("double", "d", 8), - ("vector2", "2f", 8), - ("vector3", "3f", 12), - ("vector4", "4f", 16), ]