diff --git a/MANIFEST.in b/MANIFEST.in index 9af3705..2a0ae03 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ include environments/* +include apps/* diff --git a/bioengine/services/cellpose/README.md b/apps/cellpose/README.md similarity index 100% rename from bioengine/services/cellpose/README.md rename to apps/cellpose/README.md diff --git a/bioengine/services/cellpose/predict/1/model.py b/apps/cellpose/cellpose-predict/1/model.py similarity index 100% rename from bioengine/services/cellpose/predict/1/model.py rename to apps/cellpose/cellpose-predict/1/model.py diff --git a/bioengine/services/cellpose/predict/1/pyodide_test.py b/apps/cellpose/cellpose-predict/1/pyodide_test.py similarity index 100% rename from bioengine/services/cellpose/predict/1/pyodide_test.py rename to apps/cellpose/cellpose-predict/1/pyodide_test.py diff --git a/bioengine/services/cellpose/predict/config.pbtxt b/apps/cellpose/cellpose-predict/config.pbtxt similarity index 100% rename from bioengine/services/cellpose/predict/config.pbtxt rename to apps/cellpose/cellpose-predict/config.pbtxt diff --git a/bioengine/services/cellpose/train/1/build_bioimageio_package.py b/apps/cellpose/cellpose-train/1/build_bioimageio_package.py similarity index 100% rename from bioengine/services/cellpose/train/1/build_bioimageio_package.py rename to apps/cellpose/cellpose-train/1/build_bioimageio_package.py diff --git a/bioengine/services/cellpose/train/1/interactive_cellpose.py b/apps/cellpose/cellpose-train/1/interactive_cellpose.py similarity index 100% rename from bioengine/services/cellpose/train/1/interactive_cellpose.py rename to apps/cellpose/cellpose-train/1/interactive_cellpose.py diff --git a/bioengine/services/cellpose/train/1/model.py b/apps/cellpose/cellpose-train/1/model.py similarity index 100% rename from bioengine/services/cellpose/train/1/model.py rename to apps/cellpose/cellpose-train/1/model.py diff --git a/bioengine/services/cellpose/train/1/predict.py b/apps/cellpose/cellpose-train/1/predict.py similarity index 100% rename from bioengine/services/cellpose/train/1/predict.py rename to apps/cellpose/cellpose-train/1/predict.py diff --git a/bioengine/services/cellpose/train/1/test_cellpose_train.py b/apps/cellpose/cellpose-train/1/test_cellpose_train.py similarity index 100% rename from bioengine/services/cellpose/train/1/test_cellpose_train.py rename to apps/cellpose/cellpose-train/1/test_cellpose_train.py diff --git a/bioengine/services/cellpose/train/1/test_native.py b/apps/cellpose/cellpose-train/1/test_native.py similarity index 100% rename from bioengine/services/cellpose/train/1/test_native.py rename to apps/cellpose/cellpose-train/1/test_native.py diff --git a/bioengine/services/cellpose/train/1/test_triton.py b/apps/cellpose/cellpose-train/1/test_triton.py similarity index 100% rename from bioengine/services/cellpose/train/1/test_triton.py rename to apps/cellpose/cellpose-train/1/test_triton.py diff --git a/bioengine/services/cellpose/train/config.pbtxt b/apps/cellpose/cellpose-train/config.pbtxt similarity index 100% rename from bioengine/services/cellpose/train/config.pbtxt rename to apps/cellpose/cellpose-train/config.pbtxt diff --git a/apps/cellpose/main.py b/apps/cellpose/main.py new file mode 100644 index 0000000..6b619d2 --- /dev/null +++ b/apps/cellpose/main.py @@ -0,0 +1,42 @@ +from hypha_launcher.utils.container import ContainerEngine +from pathlib import Path +from pyotritonclient import get_config, execute +from functools import partial +import logging + +logger = logging.getLogger(__name__) + +container_engine = ContainerEngine() +TRITON_IMAGE = "docker://nvcr.io/nvidia/tritonserver:23.03-py3" + + +async def hypha_startup(server): + # get current dir + current_dir = Path(__file__).parent + host_port = "9302" + logger.info(f"Pulling triton image {TRITON_IMAGE}") + container_engine.pull_image(TRITON_IMAGE) + logger.info(f"Starting triton server at port {host_port}") + container_engine.run_command( + f'bash -c "tritonserver --model-repository=/models --log-verbose=3 --log-info=1 --log-warning=1 --log-error=1 --model-control-mode=poll --exit-on-error=false --repository-poll-secs=10 --allow-grpc=False --http-port={host_port}"', # noqa + TRITON_IMAGE, + ports={host_port: host_port}, + volumes={str(current_dir): "/models"}, + ) + + server_url = f"http://localhost:{host_port}" + logger.info(f"Triton server is running at {server_url}") + + svc = await server.register_service({ + "name": "CellPose", + "id": "cellpose", + "config": { + "visibility": "public" + }, + "train": partial(execute, server_url=server_url, model_name="cellpose-train"), + "train_config": partial(get_config, server_url=server_url, model_name="cellpose-train"), + "predict": partial(execute, server_url=server_url, model_name="cellpose-predict"), + "predict_config": partial(get_config, server_url=server_url, model_name="cellpose-predict"), + }) + + logger.info(f"CellPose service is registered as `{svc['id']}`") \ No newline at end of file diff --git a/apps/cellpose/manifest.yaml b/apps/cellpose/manifest.yaml new file mode 100644 index 0000000..d3e169c --- /dev/null +++ b/apps/cellpose/manifest.yaml @@ -0,0 +1,5 @@ +name: Cellpose +id: cellpose +description: Cellpose is a generalist algorithm for cell and nucleus segmentation +runtime: python +entrypoint: main.py diff --git a/apps/imagej/main.py b/apps/imagej/main.py new file mode 100644 index 0000000..11fac1b --- /dev/null +++ b/apps/imagej/main.py @@ -0,0 +1,228 @@ +import logging +import sys, os +import imagej +import scyjava as sj +import asyncio +import traceback +import numpy as np +import xarray as xr +from jpype import JOverride, JImplements + + +logger = logging.getLogger(__name__) +os.environ["JAVA_HOME"] = os.sep.join(sys.executable.split(os.sep)[:-2] + ["jre"]) + +def capture_console(ij, print=True): + logs = {} + logs["stdout"] = [] + logs["stderr"] = [] + + @JImplements("org.scijava.console.OutputListener") + class JavaOutputListener: + @JOverride + def outputOccurred(self, e): + source = e.getSource().toString + output = e.getOutput() + + if print: + if source == "STDOUT": + sys.stdout.write(output) + logs["stdout"].append(output) + elif source == "STDERR": + sys.stderr.write(output) + logs["stderr"].append(output) + else: + output = "[{}] {}".format(source, output) + sys.stderr.write(output) + logs["stderr"].append(output) + + ij.py._outputMapper = JavaOutputListener() + ij.console().addOutputListener(ij.py._outputMapper) + return logs + + +def format_logs(logs): + output = "" + if logs["stdout"]: + output += "STDOUT:\n" + output += "\n".join(logs["stdout"]) + output += "\n" + if logs["stderr"]: + output += "STDERR:\n" + output += "\n".join(logs["stderr"]) + output += "\n" + return output + + +def get_module_info(ij, custom_script, name=None): + name = name or "scijava_script" + ScriptInfo = sj.jimport("org.scijava.script.ScriptInfo") + StringReader = sj.jimport("java.io.StringReader") + moduleinfo = ScriptInfo(ij.getContext(), name, StringReader(custom_script)) + inputs = {} + outputs = {} + + for inp in ij.py.from_java(moduleinfo.inputs()): + input_type = str(inp.getType().getName()) + input_name = str(inp.getName()) + print(input_type, input_name) + inputs[input_name] = {"name": input_name, "type": input_type} + + for outp in ij.py.from_java(moduleinfo.outputs()): + output_type = str(outp.getType().getName()) + output_name = str(outp.getName()) + outputs[output_name] = {"name": output_name, "type": output_type} + + return {"id": moduleinfo.getIdentifier(), "outputs": outputs, "inputs": inputs} + + +def check_size(array): + result_bytes = array.tobytes() + if len(result_bytes) > 20000000: # 20MB + raise Exception( + f"The data is too large ({len(result_bytes)} bytes) to be transfered." + ) + + +async def execute(config, context=None): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, run_imagej, config) + + +def run_imagej(config): + headless = config.get("headless", False) + ij = imagej.init(os.environ["IMAGEJ_DIR"], headless=headless) + try: + WindowManager = sj.jimport("ij.WindowManager") + ImagePlus = sj.jimport("ij.ImagePlus") + logs = capture_console(ij) + script = config.get("script") + lang = config.get("lang", "ijm") + assert script is not None, "script is required" + module_info = get_module_info(ij, script) + inputs_info = module_info["inputs"] + outputs_info = module_info["outputs"] + inputs = config.get("inputs", {}) + select_outputs = config.get("select_outputs") + args = {} + for k in inputs: + if isinstance(inputs[k], (np.ndarray, np.generic, dict)): + if isinstance(inputs[k], (np.ndarray, np.generic)): + if inputs[k].ndim == 2: + dims = ["x", "y"] + elif inputs[k].ndim == 3 and inputs[k].shape[2] in [1, 3, 4]: + dims = ["x", "y", "c"] + elif inputs[k].ndim == 3 and inputs[k].shape[0] in [1, 3, 4]: + dims = ["c", "x", "y"] + elif inputs[k].ndim == 3: + dims = ["z", "x", "y"] + elif inputs[k].ndim == 4: + dims = ["z", "x", "y", "c"] + elif inputs[k].ndim == 5: + dims = ["t", "z", "x", "y", "c"] + else: + raise Exception(f"Unsupported ndim: {inputs[k].ndim}") + inputs[k] = {"data": inputs[k], "dims": dims} + + img = inputs[k] + assert isinstance( + img, dict + ), f"input {k} must be a dictionary or a numpy array" + assert "data" in img, f"data is required for {k}" + assert "dims" in img, f"dims is required for {k}" + da = xr.DataArray( + data=img["data"], + dims=img["dims"], + attrs=img.get("attrs", {}), + name=k, + ) + inputs[k] = ij.py.to_java(da) + if lang == "ijm": + # convert to ImagePlus + inputs[k] = ij.convert().convert(inputs[k], ImagePlus) + if inputs[k]: + inputs[k].setTitle(k) + # Display the image + if not headless: + inputs[k].show() + else: + raise NotImplementedError( + "Don't know how to display the image (only ijm is supported)." + ) + if k in inputs_info: + args[k] = ij.py.to_java(inputs[k]) + + # Run the script + macro_result = ij.py.run_script(lang, script, args) + results = {} + if select_outputs is None: + select_outputs = list(outputs_info.keys()) + for k in select_outputs: + if k in outputs_info: + results[k] = macro_result.getOutput(k) + if results[k] and not isinstance(results[k], (int, str, float, bool)): + try: + results[k] = ij.py.from_java(results[k]).to_numpy() + check_size(results[k]) + except Exception: + # TODO: This is needed due to a bug in pyimagej for converting java string + if str(type(results[k])) == "": + results[k] = str(results[k]) + else: + results[k] = { + "type": str(type(results[k])), + "text": str(results[k]), + } + else: + # If the output name is not in the script annotation, + # Try to get the image from the WindowManager by title + img = WindowManager.getImage(k) + if not img: + raise Exception(f"Output not found: {k}\n{format_logs(logs)}") + results[k] = ij.py.from_java(img).to_numpy() + check_size(results[k]) + except Exception as exp: + raise exp + finally: + ij.dispose() + + return {"outputs": results, "logs": logs} + + + +test_macro = """ +#@ String name +#@ int age +#@ String city +#@output Object greeting +greeting = "Hi " + name + ". You are " + age + " years old, and live in " + city + "." +""" + +async def hypha_startup(server): + try: + print("Testing the imagej service...") + ret = await execute( + { + "script": test_macro, + "inputs": {"name": "Tom", "age": 20, "city": "Shanghai"}, + } + ) + outputs = ret["outputs"] + assert ( + outputs["greeting"] == "Hi Tom. You are 20 years old, and live in Shanghai." + ) + except Exception: + print(traceback.format_exc()) + sys.exit(1) + + logger.info("Starting the imagej service...") + svc = await server.register_service( + { + "id": "imagej-service", + "type": "imagej-service", + "config": {"require_context": True, "visibility": "public"}, + "execute": execute, + } + ) + + logger.info(f"ImageJ service is registered as `{svc['id']}`") \ No newline at end of file diff --git a/apps/imagej/manifest.yaml b/apps/imagej/manifest.yaml new file mode 100644 index 0000000..f0e025f --- /dev/null +++ b/apps/imagej/manifest.yaml @@ -0,0 +1,5 @@ +name: ImageJ +id: imagej +description: ImageJ is a public domain Java image processing program inspired by NIH Image for the Macintosh. +runtime: conda +entrypoint: main.py diff --git a/bioengine/__init__.py b/bioengine/__init__.py index 1814276..3202f6a 100644 --- a/bioengine/__init__.py +++ b/bioengine/__init__.py @@ -1,6 +1,13 @@ +from bioengine.app_loader import load_apps +import logging + +logger = logging.getLogger(__name__) def connect_server(server_url): raise NotImplementedError("This function is not implemented yet.") -def register_bioengine(server): - raise NotImplementedError("This function is not implemented yet.") \ No newline at end of file +async def register_bioengine_apps(server): + for app in load_apps(): + logger.info(f"Registering service {app.name}") + await app.run(server) + logger.info(f"Service {app.name} registered.") diff --git a/bioengine/__main__.py b/bioengine/__main__.py index 4527e31..6f51a0a 100644 --- a/bioengine/__main__.py +++ b/bioengine/__main__.py @@ -2,11 +2,11 @@ import argparse import asyncio import subprocess -import os +# import os def start_server(args): # get current file path so we can get the path of apps under the same directory - current_dir = os.path.dirname(os.path.abspath(__file__)) + # current_dir = os.path.dirname(os.path.abspath(__file__)) command = [ sys.executable, "-m", @@ -14,14 +14,12 @@ def start_server(args): f"--host={args.host}", f"--port={args.port}", f"--public-base-url={args.public_base_url}", - "--startup-functions=bioengine:register_bioengine" + "--startup-functions=bioengine:register_bioengine_apps" ] subprocess.run(command) def connect_server(args): from bioengine import connect_server - if args.login_required: - os.environ["BIOIMAGEIO_LOGIN_REQUIRED"] = "true" server_url = args.server_url loop = asyncio.get_event_loop() loop.create_task(connect_server(server_url)) @@ -33,10 +31,6 @@ def main(): subparsers = parser.add_subparsers() - # Init command - parser_init = subparsers.add_parser("init") - parser_init.set_defaults(func=init) - # Start server command parser_start_server = subparsers.add_parser("start-server") parser_start_server.add_argument("--host", type=str, default="0.0.0.0") diff --git a/bioengine/app_loader.py b/bioengine/app_loader.py new file mode 100644 index 0000000..ccf3a40 --- /dev/null +++ b/bioengine/app_loader.py @@ -0,0 +1,45 @@ +from pydantic import BaseModel +from pathlib import Path +import logging +from yaml import safe_load +from typing import Callable, Optional +import sys +import importlib.util + +logger = logging.getLogger(__name__) + +# define runtime type enum for app runtime +class AppRuntime(str): + python = "python" + pyodide = "pyodide" + triton = "triton" + +class AppInfo(BaseModel): + name: str + id: str + description: str + runtime: AppRuntime + entrypoint: Optional[str] = None + + async def run(self, server): + if self.runtime == AppRuntime.python: + assert self.entrypoint + + file_path = Path(__file__).parent.parent / "apps" / self.id / self.entrypoint + module_name = 'bioengine.apps.' + self.id.replace('-', '_') + spec = importlib.util.spec_from_file_location(module_name, file_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + assert hasattr(module, "hypha_startup") + await module.hypha_startup(server) + +def load_apps(): + current_dir = Path(__file__).parent + apps_dir = current_dir.parent / "apps" + # list folders under apps_dir + for app_dir in apps_dir.iterdir(): + if app_dir.is_dir(): + manifest_file = app_dir / "manifest.yaml" + if manifest_file.exists(): + manifest = safe_load(manifest_file.read_text()) + yield AppInfo.parse_obj(manifest) diff --git a/bioengine/services/__init__.py b/bioengine/services/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bioengine/services/cellpose/__init__.py b/bioengine/services/cellpose/__init__.py deleted file mode 100644 index 7970d8a..0000000 --- a/bioengine/services/cellpose/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ - -import tritonclient - -async def hypha_startup(server): - hypha_launcher = await server.get_service("hypha-launcher") - triton_server = await hypha_launcher.launch(image="nvcr.io/nvidia/tritonserver:22.04-py3", command="tritonserver --model-repository=/models", name="triton", ports=[8000]) - - async def train(**kwargs): - return tritonclient.execute(**kwargs, server_url=triton_server.server_url) - - await server.register_service({ - "id": "cellpose", - "name": "Cellpose", - "description": "A service to run cellpose", - "config":{ - "visibility": "public", - "run_in_executor": True, - }, - "train": train, - }) - -print("Hypha startup script loaded") \ No newline at end of file diff --git a/bioengine/services/http-file-server/__init__.py b/bioengine/services/http-file-server/__init__.py deleted file mode 100644 index 0feb5e9..0000000 --- a/bioengine/services/http-file-server/__init__.py +++ /dev/null @@ -1,71 +0,0 @@ -import micropip -await micropip.install(["ssl", "fastapi==0.70.0"]) - -import random -import string -import os -from fastapi import FastAPI, HTTPException -from fastapi.responses import FileResponse - -def create_fastapi_app(server_url): - app = FastAPI() - - @app.get("/files/{path:path}") - async def serve_my_app(path: str): - # Construct the full file path - file_path = os.path.abspath(os.path.join("/mnt", path)) - - # Security check: Ensure the file path is within /mnt directory - if not file_path.startswith(os.path.abspath("/mnt")): - raise HTTPException(status_code=400, detail="Invalid file path") - - # Check if the path is a file and serve it - if os.path.isfile(file_path): - return FileResponse(file_path) - else: - raise HTTPException(status_code=404, detail="File not found") - - @app.get("/", response_class=HTMLResponse) - async def index(): - files = os.listdir("/mnt") - return f''' - - - Hello - - -

Files

- -
- - - - ''' - return app - -async def hypha_startup(server): - # Registering fastapi app - connection_info = await server.get_connection_info() - fastapi_app = create_fastapi_app(connection_info["public_base_url"]) - async def serve_fastapi(args): - scope = args["scope"] - print(f'{scope["client"]} - {scope["method"]} - {scope["path"]}') - await fastapi_app(args["scope"], args["receive"], args["send"]) - - await server.register_service({ - "id": "http-" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)), - "name": "FastAPI HTTP App", - "description": "A http server to serve all the mounted files in a simple example page.", - "type": "ASGI", - "serve": serve_fastapi, - "config":{ - "visibility": "public" - }, - "example_script": f"""print("HTTP server running at {connection_info.public_base_url}/{server.config['workspace']}/apps/fastapi-http-app/")""" - }) - - print(f"Server app running at {connection_info.public_base_url}/{server.config['workspace']}/apps/fastapi-http-app/") - -print("Hypha startup script loaded") \ No newline at end of file diff --git a/environments/pyimagej-py310.yml b/environments/pyimagej-py310.yml new file mode 100644 index 0000000..40f874a --- /dev/null +++ b/environments/pyimagej-py310.yml @@ -0,0 +1,7 @@ +name: pyimagej-py310 +channels: + - conda-forge + - defaults +dependencies: + - pyimagej=1.4.1 + - openjdk=8 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6a9ba71..dfc52a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,9 @@ readme = "README.md" description = "Hypha Services for the BioEngine" dependencies = [ "imjoy-rpc", + "PyYAML", + "hypha-launcher", + "pyotritonclient" ] [tool.setuptools] diff --git a/requirements.txt b/requirements.txt index 9ac62f2..0da3eee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ imjoy-rpc -hypha \ No newline at end of file +hypha +PyYAML==0.6.1 +hypha-launcher +pyotritonclient \ No newline at end of file