diff --git a/arkprts/assets/bundle.py b/arkprts/assets/bundle.py index ecd11d5..d9f2e85 100644 --- a/arkprts/assets/bundle.py +++ b/arkprts/assets/bundle.py @@ -62,8 +62,8 @@ def load_unity_file(stream: io.BytesIO | bytes) -> bytes: return asset_file -def decrypt_global_text(data: bytes, *, rsa: bool = True) -> bytes: - """Decrypt global text.""" +def decrypt_aes_text(data: bytes, *, rsa: bool = True) -> bytes: + """Decrypt aes text.""" from Crypto.Cipher import AES mask = bytes.fromhex("554954704169383270484157776e7a7148524d4377506f6e4a4c49423357436c") @@ -86,6 +86,7 @@ def run_flatbuffers( output_directory: PathLike, ) -> pathlib.Path: """Run the flatbuffers cli. Returns the output filename.""" + stderr = io.StringIO() code = subprocess.call( [ # noqa: S603 # check for execution of untrusted input "flatc", @@ -100,31 +101,37 @@ def run_flatbuffers( "--defaults-json", "--unknown-json", "--raw-binary", - "--no-warnings", + # unfortunately not in older versions + # "--no-warnings", "--force-empty", ], + stderr=stderr, ) if code != 0: + LOGGER.error(stderr.read()) raise ValueError(f"flatc failed with code {code}") return pathlib.Path(output_directory) / (pathlib.Path(fbs_path).stem + ".json") -def resolve_fbs_schema_directory() -> pathlib.Path: +def resolve_fbs_schema_directory(server: typing.Literal["cn", "yostar"]) -> pathlib.Path: """Resolve the flatbuffers schema directory.""" - path = os.environ.get("FLATBUFFERS_SCHEMA_DIR") + path = os.environ.get(f"FLATBUFFERS_SCHEMA_DIR_{server.upper()}") if path: return pathlib.Path(path) - path = pathlib.Path(tempfile.gettempdir()) / "OpenArknightsFBS" / "FBS" - os.environ["FLATBUFFERS_SCHEMA_DIR"] = str(path) + core_path = pathlib.Path(tempfile.gettempdir()) / "ArknightsFBS" + core_path.mkdir(parents=True, exist_ok=True) + path = core_path / server / "OpenArknightsFBS" / "FBS" + os.environ[f"FLATBUFFERS_SCHEMA_DIR_{server.upper()}"] = str(path) return path async def update_fbs_schema(*, force: bool = False) -> None: """Download or otherwise update FBS files.""" - directory = resolve_fbs_schema_directory().parent - await git.update_repository("MooncellWiki/OpenArknightsFBS", directory, force=force) + for server, branch in [("cn", "main"), ("yostar", "YoStar")]: + directory = resolve_fbs_schema_directory(server).parent + await git.update_repository("MooncellWiki/OpenArknightsFBS", directory, branch=branch, force=force) def recursively_collapse_keys(obj: typing.Any) -> typing.Any: @@ -143,8 +150,15 @@ def recursively_collapse_keys(obj: typing.Any) -> typing.Any: return obj -def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes: - """Decrypt chinese fbs json file.""" +def decrypt_fbs_file( + data: bytes, + table_name: str, + server: netn.ArknightsServer, + *, + rsa: bool = True, + normalize: bool = False, +) -> bytes: + """Decrypt fbs json file.""" if rsa: data = data[128:] @@ -153,7 +167,9 @@ def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes fbs_path = tempdir / (table_name + ".bytes") fbs_path.write_bytes(data) - fbs_schema_path = resolve_fbs_schema_directory() / (table_name + ".fbs") + fbs_schema_path = resolve_fbs_schema_directory(server="cn" if server in ("cn", "bili") else "yostar") / ( + table_name + ".fbs" + ) output_directory = tempdir / "output" output_path = run_flatbuffers(fbs_path, fbs_schema_path, output_directory) @@ -163,15 +179,22 @@ def decrypt_fbs_file(data: bytes, table_name: str, *, rsa: bool = True) -> bytes if len(parsed_data) == 1: parsed_data, *_ = parsed_data.values() - return json.dumps(parsed_data, indent=4, ensure_ascii=False).encode("utf-8") + return json.dumps(parsed_data, indent=4 if normalize else None, ensure_ascii=False).encode("utf-8") -def decrypt_arknights_text(data: bytes, name: str, *, rsa: bool = True) -> bytes: +def decrypt_arknights_text( + data: bytes, + name: str, + server: netn.ArknightsServer, + *, + rsa: bool = True, + normalize: bool = False, +) -> bytes: """Decrypt arbitrary arknights data.""" if match := re.search(r"(\w+_(?:table|data|const|database))[0-9a-fA-F]{6}", name): - return decrypt_fbs_file(data, match[1], rsa=rsa) + return decrypt_fbs_file(data, match[1], rsa=rsa, server=server, normalize=normalize) - return decrypt_global_text(data, rsa=rsa) + return decrypt_aes_text(data, rsa=rsa) def load_json_or_bson(data: bytes) -> typing.Any: @@ -184,8 +207,11 @@ def load_json_or_bson(data: bytes) -> typing.Any: return json.loads(data) -def normalize_json(data: bytes, *, indent: int = 4) -> bytes: +def normalize_json(data: bytes, *, indent: int = 4, lenient: bool = True) -> bytes: """Normalize a json format.""" + if lenient and b"\x00" not in data[:256]: + return data + json_data = load_json_or_bson(data) return json.dumps(json_data, indent=indent, ensure_ascii=False).encode("utf-8") @@ -197,6 +223,9 @@ def unpack_assets( asset: UnityPyAsset, target_container: str | None = None, # target_path: str | None = None, + *, + server: netn.ArknightsServer | None = None, + normalize: bool = False, ) -> typing.Iterable[tuple[str, bytes]]: """Yield relative paths and data for a unity asset.""" for container, obj in asset.container.items(): @@ -204,6 +233,9 @@ def unpack_assets( continue if obj.type.name == "TextAsset": + if server is None: + raise TypeError("Server required for text decryption") + if match := re.match(DYNP + r"(.+\.txt)", container): data = obj.read() yield (match[1], data.script) @@ -211,30 +243,37 @@ def unpack_assets( if match := re.match(DYNP + r"(gamedata/.+?\.json)", container): data = obj.read() - yield (match[1], normalize_json(bytes(data.script))) + yield (match[1], normalize_json(bytes(data.script), lenient=not normalize)) continue if match := re.match(DYNP + r"(gamedata/.+?)\.lua\.bytes", container): data = obj.read() - text = decrypt_arknights_text(data.script, name=data.name) + text = decrypt_aes_text(data.script) yield (match[1] + ".lua", text) continue if match := re.match(DYNP + r"(gamedata/levels/(?:obt|activities)/.+?)\.bytes", container): data = obj.read() try: - text = normalize_json(bytes(data.script)[128:]) + text = normalize_json(bytes(data.script)[128:], lenient=not normalize) except UnboundLocalError: # effectively bson's "type not recognized" error - text = decrypt_fbs_file(data.script, "prts___levels") + text = decrypt_fbs_file(data.script, "prts___levels", server=server) yield (match[1] + ".json", text) continue if match := re.match(DYNP + r"(gamedata/.+?)(?:[a-fA-F0-9]{6})?\.bytes", container): data = obj.read() - # the only rsa-less file is global's enemy_database - text = decrypt_arknights_text(data.script, name=data.name, rsa=data.name != "enemy_database") - yield (match[1] + ".json", normalize_json(text)) + # the only rsa-less file is ~~global~~ tw's enemy_database + + text = decrypt_arknights_text( + data.script, + name=data.name, + rsa=data.name != "enemy_database", + server=server, + normalize=normalize, + ) + yield (match[1] + ".json", normalize_json(text, lenient=not normalize)) continue @@ -332,6 +371,7 @@ async def _download_and_save( *, target_container: str | None = None, server: netn.ArknightsServer | None = None, + normalize: bool = False, ) -> typing.AsyncIterable[tuple[str, bytes]]: """Download and extract an asset.""" server = server or self.default_server @@ -339,7 +379,10 @@ async def _download_and_save( asset = await self._download_unity_asset(path, server=server) fetched_any = False - for fetched_any, (path, data) in enumerate(unpack_assets(asset, target_container), 1): + for fetched_any, (path, data) in enumerate( + unpack_assets(asset, target_container, server=server, normalize=normalize), + 1, + ): savepath = self.directory / server / path savepath.parent.mkdir(exist_ok=True, parents=True) savepath.write_bytes(data) @@ -367,9 +410,6 @@ async def update_assets( return - if server in ("cn", "bili"): - await update_fbs_schema() - hot_update_list = await self._get_hot_update_list(server) requested_names = [info["name"] for info in hot_update_list["abInfos"] if fnmatch.fnmatch(info["name"], allow)] @@ -378,6 +418,9 @@ async def update_assets( outdated_names = set(get_outdated_hashes(hot_update_list, old_hot_update_list)) requested_names = [name for name in requested_names if name in outdated_names] + if any("gamedata" in name for name in requested_names): + await update_fbs_schema() + # sequential doesn't matter since most of the time is spent unpacking # Fix this once images come into play (threadpoolexecutor and such) # first download all .ab files in a temporary directory then start extracting them. @@ -409,7 +452,7 @@ async def aget_file(self, path: str, *, server: netn.ArknightsServer | None = No for potential_asset_path in asset_paths: asset = await self._download_unity_asset(potential_asset_path, server=server) - for output_path, data in unpack_assets(asset, path): + for output_path, data in unpack_assets(asset, path, server=server): if save: savepath = self.directory / server / output_path savepath.parent.mkdir(exist_ok=True, parents=True) diff --git a/arkprts/assets/git.py b/arkprts/assets/git.py index c6f1268..4efb302 100644 --- a/arkprts/assets/git.py +++ b/arkprts/assets/git.py @@ -27,10 +27,11 @@ LOGGER: logging.Logger = logging.getLogger("arkprts.assets.git") -GAMEDATA_REPOSITORY = "Kengxxiao/ArknightsGameData" -TW_GAMEDATA_REPOSITORY = "aelurum/ArknightsGameData" # zh-tw fork -RESOURCES_REPOSITORY = "Aceship/Arknight-Images" -ALT_RESOURCES_REPOSITORY = "yuanyan3060/ArknightsGameResource" # contains zh-cn files +CN_GAMEDATA_REPOSITORY = "Kengxxiao/ArknightsGameData" # master +GLOBAL_GAMEDATA_REPOSITORY = "Kengxxiao/ArknightsGameData_YoStar" # main +TW_GAMEDATA_REPOSITORY = "aelurum/ArknightsGameData" # zh-tw fork # master_v2 +RESOURCES_REPOSITORY = "Aceship/Arknight-Images" # main +ALT_RESOURCES_REPOSITORY = "yuanyan3060/ArknightsGameResource" # contains zh-cn files # main GAMEDATA_LANGUAGE: typing.Mapping[netn.ArknightsServer, str] = { "en": "en_US", @@ -44,27 +45,36 @@ PathLike = typing.Union[pathlib.Path, str] -async def download_github_file(repository: str, path: str) -> bytes: +async def download_github_file(repository: str, path: str, *, branch: str = "HEAD") -> bytes: """Download a file from github.""" - url = f"https://raw.githubusercontent.com/{repository}/HEAD/{path}" + url = f"https://raw.githubusercontent.com/{repository}/{branch}/{path}" async with aiohttp.request("GET", url) as response: response.raise_for_status() return await response.read() -async def get_github_repository_commit(repository: str) -> str: +async def get_github_repository_commit(repository: str, *, branch: str = "HEAD") -> str: """Get the commit hash of a github repository.""" - url = f"https://api.github.com/repos/{repository}/commits/HEAD" + url = f"https://api.github.com/repos/{repository}/commits/{branch}" async with aiohttp.request("GET", url) as response: response.raise_for_status() data = await response.json() return data["sha"] -async def download_github_tarball(repository: str, destination: PathLike | None) -> pathlib.Path: +async def download_github_tarball( + repository: str, + destination: PathLike | None, + *, + branch: str = "HEAD", +) -> pathlib.Path: """Download a tarball from github.""" destination = pathlib.Path(destination or tempfile.mktemp(f"{repository.split('/')[-1]}.tar.gz")) - url = f"https://api.github.com/repos/{repository}/tarball" + if branch == "HEAD": + url = f"https://api.github.com/repos/{repository}/tarball" + else: + url = f"https://github.com/{repository}/archive/refs/heads/{branch}.tar.gz" + async with aiohttp.ClientSession(auto_decompress=False) as session, session.get(url) as response: response.raise_for_status() with destination.open("wb") as file: @@ -92,7 +102,14 @@ def decompress_tarball(path: PathLike, destination: PathLike, *, allow: str = "* return top_directory -async def download_repository(repository: str, destination: PathLike, allow: str = "*", *, force: bool = False) -> None: +async def download_repository( + repository: str, + destination: PathLike, + allow: str = "*", + *, + branch: str = "HEAD", + force: bool = False, +) -> None: """Download a repository from github.""" destination = pathlib.Path(destination) commit_file = destination / "commit.txt" @@ -102,7 +119,7 @@ async def download_repository(repository: str, destination: PathLike, allow: str return try: - commit = await get_github_repository_commit(repository) + commit = await get_github_repository_commit(repository, branch=branch) except aiohttp.ClientResponseError: LOGGER.warning("Failed to get %s commit, skipping download", repository, exc_info=True) return @@ -112,7 +129,11 @@ async def download_repository(repository: str, destination: PathLike, allow: str return LOGGER.info("Downloading %s to %s [%s]", repository, str(destination), commit) - tarball_path = await download_github_tarball(repository, destination / f"{repository.split('/')[-1]}.tar.gz") + tarball_path = await download_github_tarball( + repository, + destination / f"{repository.split('/')[-1]}.tar.gz", + branch=branch, + ) LOGGER.debug("Decompressing %s", repository) tarball_commit = decompress_tarball(tarball_path, destination, allow=allow) @@ -127,7 +148,7 @@ async def download_repository(repository: str, destination: PathLike, allow: str LOGGER.info("Downloaded %s", repository) -async def update_git_repository(repository: str, directory: PathLike) -> None: +async def update_git_repository(repository: str, directory: PathLike, *, branch: str = "HEAD") -> None: """Update game data.""" directory = pathlib.Path(directory) @@ -138,6 +159,7 @@ async def update_git_repository(repository: str, directory: PathLike) -> None: "git", "clone", "--depth=1", + *([] if branch == "HEAD" else ["--branch", branch]), f"https://github.com/{repository}.git", cwd=directory.parent, ) @@ -159,12 +181,19 @@ async def _check_git_installed_async() -> bool: return (await proc.wait()) == 0 -async def update_repository(repository: str, directory: PathLike, *, allow: str = "*", force: bool = False) -> None: +async def update_repository( + repository: str, + directory: PathLike, + *, + allow: str = "*", + branch: str = "HEAD", + force: bool = False, +) -> None: """Update a repository even if git is not installed.""" if await _check_git_installed_async(): - await update_git_repository(repository, directory) + await update_git_repository(repository, directory, branch=branch) else: - await download_repository(repository, directory, allow=allow, force=force) + await download_repository(repository, directory, allow=allow, branch=branch, force=force) class GitAssets(base.Assets): @@ -188,13 +217,24 @@ def __init__( default_directory = pathlib.Path(tempfile.gettempdir()) - self.gamedata_directory = pathlib.Path(gamedata_directory or default_directory / "ArknightsGameData") - self.resources_directory = pathlib.Path(resources_directory or default_directory / "ArknightsGameResource") - self.gamedata_repository = gamedata_repository or ( - TW_GAMEDATA_REPOSITORY if self.default_server == "tw" else GAMEDATA_REPOSITORY - ) + if gamedata_repository: + self.gamedata_repository = gamedata_repository + elif self.default_server == "cn": + self.gamedata_repository = CN_GAMEDATA_REPOSITORY + elif self.default_server == "tw": + self.gamedata_repository = TW_GAMEDATA_REPOSITORY + else: + self.gamedata_repository = GLOBAL_GAMEDATA_REPOSITORY + self.resources_repository = resources_repository or RESOURCES_REPOSITORY + self.gamedata_directory = pathlib.Path( + gamedata_directory or default_directory / self.gamedata_repository.split("/")[1], + ) + self.resources_directory = pathlib.Path( + resources_directory or default_directory / self.resources_repository.split("/")[1], + ) + async def update_assets( self, resources: bool = False, diff --git a/arkprts/models/data.py b/arkprts/models/data.py index 7c36215..a480670 100644 --- a/arkprts/models/data.py +++ b/arkprts/models/data.py @@ -261,7 +261,7 @@ class Troops(base.BaseModel): char_group: typing.Mapping[str, CharGroup] = pydantic.Field(alias="charGroup") """Additional operator data.""" char_mission: typing.Mapping[str, typing.Mapping[str, int]] = pydantic.Field(alias="charMission", repr=False) - """IDK. Special operation missions..""" + """IDK. Special operation missions.""" addon: base.DDict = pydantic.Field(default_factory=base.DDict, repr=False) """IDK. Unlockable character story and stage.""" @@ -271,7 +271,7 @@ class Skins(base.BaseModel): character_skins: typing.Mapping[str, bool] = pydantic.Field(alias="characterSkins") """Owned skins.""" - skin_ts: typing.Mapping[str, base.ArknightsTimestamp] = pydantic.Field({}, alias="skinTs") + skin_ts: typing.Mapping[str, base.ArknightsTimestamp] = pydantic.Field(default={}, alias="skinTs") """When the skins were obtained."""