From 2a343037d733e4d63c8cc6cd4b642260bc2d25b9 Mon Sep 17 00:00:00 2001 From: marios8543 Date: Tue, 14 Nov 2023 22:50:16 +0200 Subject: [PATCH] Infrastructure for custom backend support --- backend/decky_loader/helpers.py | 4 +- backend/decky_loader/loader.py | 5 +- .../decky_loader/localplatform/localsocket.py | 12 +- backend/decky_loader/plugin/binary_plugin.py | 54 +++++++ .../plugin/{plugin.py => plugin_wrapper.py} | 55 +++++-- backend/decky_loader/plugin/python_plugin.py | 101 ++++++++++++ .../decky_loader/plugin/sandboxed_plugin.py | 149 ++++-------------- 7 files changed, 240 insertions(+), 140 deletions(-) create mode 100644 backend/decky_loader/plugin/binary_plugin.py rename backend/decky_loader/plugin/{plugin.py => plugin_wrapper.py} (62%) create mode 100644 backend/decky_loader/plugin/python_plugin.py diff --git a/backend/decky_loader/helpers.py b/backend/decky_loader/helpers.py index e3770c639..be8f2dbaf 100644 --- a/backend/decky_loader/helpers.py +++ b/backend/decky_loader/helpers.py @@ -140,8 +140,8 @@ def get_user_group_id() -> int: return localplatform.localplatform._get_user_group_id() # pyright: ignore [reportPrivateUsage] # Get the default home path unless a user is specified -def get_home_path(username: str | None = None) -> str: - return localplatform.get_home_path(UserType.ROOT if username == "root" else UserType.HOST_USER) +def get_home_path(type: UserType = UserType.HOST_USER) -> str: + return localplatform.get_home_path(type) async def is_systemd_unit_active(unit_name: str) -> bool: return await localplatform.service_active(unit_name) diff --git a/backend/decky_loader/loader.py b/backend/decky_loader/loader.py index 7567912ca..94f1f45ef 100644 --- a/backend/decky_loader/loader.py +++ b/backend/decky_loader/loader.py @@ -17,7 +17,7 @@ from .main import PluginManager from .injector import get_gamepadui_tab -from .plugin.plugin import PluginWrapper +from .plugin.plugin_wrapper import PluginWrapper Plugins = dict[str, PluginWrapper] ReloadQueue = Queue[Tuple[str, str, bool | None] | Tuple[str, str]] @@ -146,8 +146,9 @@ def import_plugin(self, file: str, plugin_directory: str, refresh: bool | None = self.plugins.pop(plugin.name, None) if plugin.passive: self.logger.info(f"Plugin {plugin.name} is passive") - self.plugins[plugin.name] = plugin.start() + self.plugins[plugin.name] = plugin self.plugins[plugin.name].set_emitted_message_callback(log_plugin_emitted_message) + plugin.start() self.logger.info(f"Loaded {plugin.name}") if not batch: self.loop.create_task(self.dispatch_plugin(plugin.name, plugin.version)) diff --git a/backend/decky_loader/localplatform/localsocket.py b/backend/decky_loader/localplatform/localsocket.py index 93b1ea18a..4815a6613 100644 --- a/backend/decky_loader/localplatform/localsocket.py +++ b/backend/decky_loader/localplatform/localsocket.py @@ -7,22 +7,26 @@ BUFFER_LIMIT = 2 ** 20 # 1 MiB class UnixSocket: - def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]): + def __init__(self): ''' on_new_message takes 1 string argument. It's return value gets used, if not None, to write data to the socket. Method should be async ''' self.socket_addr = f"/tmp/plugin_socket_{time.time()}" - self.on_new_message = on_new_message self.socket = None self.reader = None self.writer = None self.server_writer = None + self.on_new_message: Callable[[str], Coroutine[Any, Any, Any]] + async def setup_server(self): self.socket = await asyncio.start_unix_server(self._listen_for_method_call, path=self.socket_addr, limit=BUFFER_LIMIT) + def set_new_message_callback(self, callback: Callable[[str], Coroutine[Any, Any, Any]]): + self.on_new_message = callback + async def _open_socket_if_not_exists(self): if not self.reader: retries = 0 @@ -110,13 +114,13 @@ def _(task: asyncio.Task[str|None]): asyncio.create_task(self.on_new_message(line)).add_done_callback(_) class PortSocket (UnixSocket): - def __init__(self, on_new_message: Callable[[str], Coroutine[Any, Any, Any]]): + def __init__(self): ''' on_new_message takes 1 string argument. It's return value gets used, if not None, to write data to the socket. Method should be async ''' - super().__init__(on_new_message) + super().__init__() self.host = "127.0.0.1" self.port = random.sample(range(40000, 60000), 1)[0] diff --git a/backend/decky_loader/plugin/binary_plugin.py b/backend/decky_loader/plugin/binary_plugin.py new file mode 100644 index 000000000..d1b8cea9e --- /dev/null +++ b/backend/decky_loader/plugin/binary_plugin.py @@ -0,0 +1,54 @@ +from asyncio import StreamReader, create_task, sleep, create_subprocess_exec +from asyncio.subprocess import Process +from subprocess import PIPE + +from .sandboxed_plugin import SandboxedPlugin +from ..localplatform.localsocket import LocalSocket +from ..customtypes import UserType + +from typing import Dict, List + +class BinaryPlugin(SandboxedPlugin): + def __init__(self, + socket: LocalSocket, + name: str, + flags: List[str], + file: str, + plugin_directory: str, + plugin_path: str, + version: str | None, + author: str, + env: Dict[str, str]) -> None: + super().__init__(socket, name, flags, file, plugin_directory, plugin_path, version, author, env) + self.process: Process + + def start(self): + create_task(self._start()) + + async def stop(self): + self.process.terminate() + while not self.process.returncode: + await sleep(0) + + async def _start(self): + self.env["DECKY_SOCKET"] = self.socket.socket_addr + user_type = UserType.ROOT.value if "root" in self.flags else UserType.HOST_USER.value + self.process = await create_subprocess_exec(self.file, + env=self.env, + user=user_type, + group=user_type, + stdout=PIPE, + stderr=PIPE) + assert self.process.stderr and self.process.stdout + create_task(self._stream_watcher(self.process.stdout, False)) + create_task(self._stream_watcher(self.process.stderr, True)) + + async def _stream_watcher(self, stream: StreamReader, is_err: bool): + async for line in stream: + line = line.decode("utf-8") + if not line.strip(): + continue + if is_err: + self.log.error(line) + else: + self.log.info(line) \ No newline at end of file diff --git a/backend/decky_loader/plugin/plugin.py b/backend/decky_loader/plugin/plugin_wrapper.py similarity index 62% rename from backend/decky_loader/plugin/plugin.py rename to backend/decky_loader/plugin/plugin_wrapper.py index 6c3381061..e76290c7f 100644 --- a/backend/decky_loader/plugin/plugin.py +++ b/backend/decky_loader/plugin/plugin_wrapper.py @@ -2,9 +2,9 @@ from json import dumps, load, loads from logging import getLogger from os import path -from multiprocessing import Process -from .sandboxed_plugin import SandboxedPlugin +from .python_plugin import PythonPlugin +from .binary_plugin import BinaryPlugin from .method_call_request import MethodCallRequest from ..localplatform.localsocket import LocalSocket @@ -26,22 +26,32 @@ def __init__(self, file: str, plugin_directory: str, plugin_path: str) -> None: self.name = json["name"] self.author = json["author"] self.flags = json["flags"] - - self.passive = not path.isfile(self.file) + self.env: Dict[str, str] = json["env"] if "env" in json else {} - self.log = getLogger("plugin") + passive = not path.isfile(self.file) + + if "backend" in json: + self.file = path.join(plugin_path, plugin_directory, json["backend"]) + self._socket = LocalSocket() + self.sandboxed_plugin = BinaryPlugin(self._socket, self.name, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author, self.env) + elif not passive: + self._socket = LocalSocket() + self.sandboxed_plugin = PythonPlugin(self._socket, self.name, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author, self.env) + else: + self.sandboxed_plugin = None - self.sandboxed_plugin = SandboxedPlugin(self.name, self.passive, self.flags, self.file, self.plugin_directory, self.plugin_path, self.version, self.author) - #TODO: Maybe make LocalSocket not require on_new_message to make this cleaner - self._socket = LocalSocket(self.sandboxed_plugin.on_new_message) + self.log = getLogger("plugin") self._listener_task: Task[Any] self._method_call_requests: Dict[str, MethodCallRequest] = {} - self.emitted_message_callback: Callable[[Dict[Any, Any]], Coroutine[Any, Any, Any]] def __str__(self) -> str: return self.name + @property + def passive(self): + return not self.sandboxed_plugin + async def _response_listener(self): while True: try: @@ -59,8 +69,8 @@ def set_emitted_message_callback(self, callback: Callable[[Dict[Any, Any]], Coro self.emitted_message_callback = callback async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]): - if self.passive: - raise RuntimeError("This plugin is passive (aka does not implement main.py)") + if not self.sandboxed_plugin: + raise RuntimeError("This plugin is passive and does not implement a backend.") request = MethodCallRequest() await self._socket.get_socket_connection() @@ -69,16 +79,27 @@ async def execute_method(self, method_name: str, kwargs: Dict[Any, Any]): return await request.wait_for_result() - def start(self): - if self.passive: - return self - Process(target=self.sandboxed_plugin.initialize, args=[self._socket]).start() + async def _start(self): + if not self.sandboxed_plugin: + return + await self._socket.setup_server() self._listener_task = create_task(self._response_listener()) - return self + self.sandboxed_plugin.start() + + def start(self): + if not self.sandboxed_plugin: + return + create_task(self._start()) def stop(self): + try: + assert self.sandboxed_plugin + except AssertionError: + return + self._listener_task.cancel() async def _(self: PluginWrapper): - await self._socket.write_single_line(dumps({ "stop": True }, ensure_ascii=False)) + assert self.sandboxed_plugin #Need to assert again or pyright complains. No need to care about this, it will fail above first. + await self.sandboxed_plugin.stop() await self._socket.close_socket_connection() create_task(_(self)) \ No newline at end of file diff --git a/backend/decky_loader/plugin/python_plugin.py b/backend/decky_loader/plugin/python_plugin.py new file mode 100644 index 000000000..e9e7e490d --- /dev/null +++ b/backend/decky_loader/plugin/python_plugin.py @@ -0,0 +1,101 @@ +from asyncio import get_event_loop, new_event_loop, set_event_loop, sleep +from importlib.util import module_from_spec, spec_from_file_location +from json import dumps, loads +from multiprocessing import Process +from sys import path as syspath, modules as sysmodules +from os import path, environ, setgid, setuid +from traceback import format_exc +from signal import signal, SIGINT + +from .sandboxed_plugin import SandboxedPlugin +from .method_call_request import SocketResponseDict +from ..localplatform.localsocket import LocalSocket +from ..customtypes import UserType + +from typing import Any, Dict, List + +class PythonPlugin(SandboxedPlugin): + def __init__(self, + socket: LocalSocket, + name: str, + flags: List[str], + file: str, + plugin_directory: str, + plugin_path: str, + version: str | None, + author: str, + env: Dict[str, str]) -> None: + super().__init__(socket, name, flags, file, plugin_directory, plugin_path, version, author, env) + self.socket.set_new_message_callback(self._on_new_message) + + def start(self): + Process(target=self._initialize).start() + + async def stop(self): + await self._unload() + get_event_loop().stop() + while get_event_loop().is_running(): + await sleep(0) + get_event_loop().close() + raise Exception("Closing message listener") + + def _initialize(self): + signal(SIGINT, lambda s, f: exit(0)) + setgid(UserType.ROOT.value if "root" in self.flags else UserType.HOST_USER.value) + setuid(UserType.ROOT.value if "root" in self.flags else UserType.HOST_USER.value) + environ.update(self.env) + + set_event_loop(new_event_loop()) + + # append the plugin's `py_modules` to the recognized python paths + syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) + + #TODO: FIX IN A LESS CURSED WAY + keys = [key for key in sysmodules if key.startswith("decky_loader.")] + for key in keys: + sysmodules[key.replace("decky_loader.", "")] = sysmodules[key] + + spec = spec_from_file_location("_", self.file) + assert spec is not None + module = module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + self.Plugin = module.Plugin + + setattr(self.Plugin, "emit_message", self._emit_message) + + if hasattr(self.Plugin, "_migration"): + get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin)) + if hasattr(self.Plugin, "_main"): + get_event_loop().create_task(self.Plugin._main(self.Plugin)) + get_event_loop().run_forever() + + async def _unload(self): + try: + self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n") + if hasattr(self.Plugin, "_unload"): + await self.Plugin._unload(self.Plugin) + self.log.info("Unloaded " + self.name + "\n") + else: + self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n") + except: + self.log.error("Failed to unload " + self.name + "!\n" + format_exc()) + exit(0) + + async def _on_new_message(self, message : str) -> str|None: + data = loads(message) + + d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]} + try: + d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) + except Exception as e: + d["res"] = str(e) + d["success"] = False + finally: + return dumps(d, ensure_ascii=False) + + async def _emit_message(self, message: Dict[Any, Any]): + await self.socket.write_single_line_server(dumps({ + "id": "0", + "payload": message + })) \ No newline at end of file diff --git a/backend/decky_loader/plugin/sandboxed_plugin.py b/backend/decky_loader/plugin/sandboxed_plugin.py index 6be97b4a0..b799783ff 100644 --- a/backend/decky_loader/plugin/sandboxed_plugin.py +++ b/backend/decky_loader/plugin/sandboxed_plugin.py @@ -1,33 +1,26 @@ -from os import path, environ -from signal import SIGINT, signal -from importlib.util import module_from_spec, spec_from_file_location -from json import dumps, loads from logging import getLogger -from sys import exit, path as syspath, modules as sysmodules -from traceback import format_exc -from asyncio import (get_event_loop, new_event_loop, - set_event_loop, sleep) +from os import path -from .method_call_request import SocketResponseDict from ..localplatform.localsocket import LocalSocket -from ..localplatform.localplatform import setgid, setuid, get_username, get_home_path -from ..customtypes import UserType from .. import helpers +from ..customtypes import UserType +from ..localplatform.localplatform import get_username -from typing import Any, Dict, List + +from typing import Dict, List class SandboxedPlugin: def __init__(self, + socket: LocalSocket, name: str, - passive: bool, flags: List[str], file: str, plugin_directory: str, plugin_path: str, version: str|None, - author: str) -> None: + author: str, + env: Dict[str, str]) -> None: self.name = name - self.passive = passive self.flags = flags self.file = file self.plugin_path = plugin_path @@ -36,103 +29,29 @@ def __init__(self, self.author = author self.log = getLogger("plugin") - - def initialize(self, socket: LocalSocket): - self._socket = socket - - try: - signal(SIGINT, lambda s, f: exit(0)) - - set_event_loop(new_event_loop()) - if self.passive: - return - setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) - setuid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) - # export a bunch of environment variables to help plugin developers - environ["HOME"] = get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER) - environ["USER"] = "root" if "root" in self.flags else get_username() - environ["DECKY_VERSION"] = helpers.get_loader_version() - environ["DECKY_USER"] = get_username() - environ["DECKY_USER_HOME"] = helpers.get_home_path() - environ["DECKY_HOME"] = helpers.get_homebrew_path() - environ["DECKY_PLUGIN_SETTINGS_DIR"] = path.join(environ["DECKY_HOME"], "settings", self.plugin_directory) - helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "settings")) - helpers.mkdir_as_user(environ["DECKY_PLUGIN_SETTINGS_DIR"]) - environ["DECKY_PLUGIN_RUNTIME_DIR"] = path.join(environ["DECKY_HOME"], "data", self.plugin_directory) - helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "data")) - helpers.mkdir_as_user(environ["DECKY_PLUGIN_RUNTIME_DIR"]) - environ["DECKY_PLUGIN_LOG_DIR"] = path.join(environ["DECKY_HOME"], "logs", self.plugin_directory) - helpers.mkdir_as_user(path.join(environ["DECKY_HOME"], "logs")) - helpers.mkdir_as_user(environ["DECKY_PLUGIN_LOG_DIR"]) - environ["DECKY_PLUGIN_DIR"] = path.join(self.plugin_path, self.plugin_directory) - environ["DECKY_PLUGIN_NAME"] = self.name - if self.version: - environ["DECKY_PLUGIN_VERSION"] = self.version - environ["DECKY_PLUGIN_AUTHOR"] = self.author - - # append the plugin's `py_modules` to the recognized python paths - syspath.append(path.join(environ["DECKY_PLUGIN_DIR"], "py_modules")) - - #TODO: FIX IN A LESS CURSED WAY - keys = [key for key in sysmodules if key.startswith("decky_loader.")] - for key in keys: - sysmodules[key.replace("decky_loader.", "")] = sysmodules[key] - - spec = spec_from_file_location("_", self.file) - assert spec is not None - module = module_from_spec(spec) - assert spec.loader is not None - spec.loader.exec_module(module) - self.Plugin = module.Plugin - - setattr(self.Plugin, "emit_message", self.emit_message) - #TODO: Find how to put emit_message on global namespace so it doesn't pollute Plugin - - if hasattr(self.Plugin, "_migration"): - get_event_loop().run_until_complete(self.Plugin._migration(self.Plugin)) - if hasattr(self.Plugin, "_main"): - get_event_loop().create_task(self.Plugin._main(self.Plugin)) - get_event_loop().create_task(socket.setup_server()) - get_event_loop().run_forever() - except: - self.log.error("Failed to start " + self.name + "!\n" + format_exc()) - exit(0) - - async def _unload(self): - try: - self.log.info("Attempting to unload with plugin " + self.name + "'s \"_unload\" function.\n") - if hasattr(self.Plugin, "_unload"): - await self.Plugin._unload(self.Plugin) - self.log.info("Unloaded " + self.name + "\n") - else: - self.log.info("Could not find \"_unload\" in " + self.name + "'s main.py" + "\n") - except: - self.log.error("Failed to unload " + self.name + "!\n" + format_exc()) - exit(0) - - async def on_new_message(self, message : str) -> str|None: - data = loads(message) - - if "stop" in data: - self.log.info("Calling Loader unload function.") - await self._unload() - get_event_loop().stop() - while get_event_loop().is_running(): - await sleep(0) - get_event_loop().close() - raise Exception("Closing message listener") - - d: SocketResponseDict = {"res": None, "success": True, "id": data["id"]} - try: - d["res"] = await getattr(self.Plugin, data["method"])(self.Plugin, **data["args"]) - except Exception as e: - d["res"] = str(e) - d["success"] = False - finally: - return dumps(d, ensure_ascii=False) - - async def emit_message(self, message: Dict[Any, Any]): - await self._socket.write_single_line_server(dumps({ - "id": "0", - "payload": message - })) \ No newline at end of file + self.env = env + self.socket = socket + + # export a bunch of environment variables to help plugin developers + self.env.update({ + "HOME": helpers.get_home_path(UserType.ROOT if "root" in self.flags else UserType.HOST_USER), + "USER": "root" if "root" in self.flags else get_username(), + "DECKY_VERSION": helpers.get_loader_version(), + "DECKY_USER": get_username(), + "DECKY_USER_HOME": helpers.get_home_path(), + "DECKY_HOME": helpers.get_homebrew_path(), + "DECKY_PLUGIN_SETTINGS_DIR": path.join(helpers.get_homebrew_path(), "settings", self.plugin_directory), + "DECKY_PLUGIN_DIR": path.join(self.plugin_path, self.plugin_directory), + "DECKY_PLUGIN_NAME": self.name, + "DECKY_PLUGIN_AUTHOR": self.author, + "DECKY_PLUGIN_VERSION": self.version or "", + "DECKY_PLUGIN_RUNTIME_DIR": path.join(helpers.get_homebrew_path(), "data", self.plugin_directory), + "DECKY_PLUGIN_LOG_DIR": path.join(helpers.get_homebrew_path(), "logs", self.plugin_directory) + }) + helpers.mkdir_as_user(self.env["DECKY_PLUGIN_SETTINGS_DIR"]) + helpers.mkdir_as_user(self.env["DECKY_PLUGIN_RUNTIME_DIR"]) + helpers.mkdir_as_user(self.env["DECKY_PLUGIN_LOG_DIR"]) + + def start(self): pass + + async def stop(self): pass \ No newline at end of file