diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 8b7ef234..ae3eeadc 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -42,7 +42,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - crate: [libertem_dectris, libertem_asi_tpx3, ipc_test] + crate: [libertem_dectris, libertem_asi_tpx3, libertem_asi_mpx3, ipc_test] steps: - uses: actions/checkout@v4 with: @@ -93,7 +93,7 @@ jobs: runs-on: windows-latest strategy: matrix: - crate: [libertem_dectris, libertem_asi_tpx3, ipc_test] + crate: [libertem_dectris, libertem_asi_tpx3, libertem_asi_mpx3, ipc_test] steps: - name: Install LLVM and Clang # required for bindgen to work, see https://github.com/rust-lang/rust-bindgen/issues/1797 uses: KyleMayes/install-llvm-action@v2.0.2 @@ -124,7 +124,7 @@ jobs: MACOSX_DEPLOYMENT_TARGET: "11.0" strategy: matrix: - crate: [libertem_dectris, libertem_asi_tpx3] + crate: [libertem_dectris, libertem_asi_tpx3, libertem_asi_mpx3] os: [macos-12, macos-14] runs-on: ${{ matrix.os }} steps: diff --git a/Cargo.lock b/Cargo.lock index af74b826..08a95edd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -499,6 +499,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "bincode" version = "1.3.3" @@ -1139,6 +1145,15 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "enumflags2" version = "0.7.8" @@ -1342,6 +1357,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", +] + [[package]] name = "futures-core" version = "0.3.30" @@ -1562,6 +1586,25 @@ dependencies = [ "gl_generator", ] +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap 2.2.2", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.2.1" @@ -1630,18 +1673,72 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.5", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.5.0" @@ -1740,6 +1837,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "itertools" version = "0.10.5" @@ -1844,6 +1947,24 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libertem-asi-mpx3" +version = "0.2.11" +dependencies = [ + "bincode", + "clap 3.2.25", + "crossbeam", + "crossbeam-channel", + "env_logger", + "ipc-test", + "log", + "pyo3", + "serde", + "serval-client", + "stats", + "tempfile", +] + [[package]] name = "libertem-asi-tpx3" version = "0.2.11" @@ -2047,6 +2168,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2075,6 +2202,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndarray" version = "0.15.6" @@ -2395,6 +2540,50 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.4.2", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "orbclient" version = "0.3.47" @@ -2846,6 +3035,46 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2885,6 +3114,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "ryu" version = "1.0.16" @@ -2900,6 +3138,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -2925,6 +3172,29 @@ dependencies = [ "tiny-skia", ] +[[package]] +name = "security-framework" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f3cc463c0ef97e11c3461a9d3787412d30e8e7eb907c79180c4a57bf7c04ef" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "sendfd" version = "0.4.3" @@ -2997,6 +3267,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serval-client" +version = "0.1.0" +dependencies = [ + "reqwest", + "serde", + "serde_json", + "url", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3195,6 +3487,33 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "system-deps" version = "6.2.0" @@ -3339,6 +3658,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.18.0" @@ -3351,6 +3680,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-util" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "toml" version = "0.8.9" @@ -3396,6 +3739,12 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" @@ -3453,6 +3802,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "ttf-parser" version = "0.20.0" @@ -3465,7 +3820,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" dependencies = [ - "base64", + "base64 0.13.1", "byteorder", "bytes", "http", @@ -3561,6 +3916,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -3595,6 +3956,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -4158,6 +4528,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if 1.0.0", + "windows-sys 0.48.0", +] + [[package]] name = "x11-dl" version = "2.21.0" diff --git a/Cargo.toml b/Cargo.toml index 434d8043..00e890a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,10 +2,12 @@ members = [ "libertem_dectris", "libertem_asi_tpx3", + "libertem_asi_mpx3", "ipc_test", "playegui", "bs-sys", "stats", + "serval-client", ] [profile.release] diff --git a/libertem_asi_mpx3/Cargo.toml b/libertem_asi_mpx3/Cargo.toml new file mode 100644 index 00000000..618262b7 --- /dev/null +++ b/libertem_asi_mpx3/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "libertem-asi-mpx3" +authors = ["Alexander Clausen "] +license = "MIT" +version = "0.2.11" +edition = "2021" +readme = "README.md" +rust-version = "1.66" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "libertem_asi_mpx3" +crate-type = ["cdylib"] + +[dependencies] +bincode = "1.3.3" +clap = { version = "3.2.16", features = ["derive"] } +crossbeam = "0.8.2" +crossbeam-channel = "0.5.6" +env_logger = "0.9.3" +log = "0.4.17" +pyo3 = { version = "0.21.0", features = ["abi3-py37"] } +serde = { version = "1.0.143", features = ["derive"] } +ipc-test = { path = "../ipc_test" } +serval-client = { path = "../serval-client" } +stats = { path = "../stats" } + +[features] +extension-module = ["pyo3/extension-module"] + +[dev-dependencies] +tempfile = "3.3.0" diff --git a/libertem_asi_mpx3/README.md b/libertem_asi_mpx3/README.md new file mode 100644 index 00000000..e69de29b diff --git a/libertem_asi_mpx3/examples/live_server.py b/libertem_asi_mpx3/examples/live_server.py new file mode 100644 index 00000000..d75d866e --- /dev/null +++ b/libertem_asi_mpx3/examples/live_server.py @@ -0,0 +1,373 @@ +import time +import json +import asyncio +import typing +import copy +from collections import OrderedDict + +import numba +import numpy as np +import uuid +import websockets +from websockets.legacy.server import WebSocketServerProtocol +from websockets.legacy.client import WebSocketClientProtocol +import bitshuffle + +from libertem import masks +from libertem.udf.sum import SumUDF +from libertem.udf.sumsigudf import SumSigUDF +from libertem.udf.masks import ApplyMasksUDF +from libertem.executor.pipelined import PipelinedExecutor +from libertem_live.api import LiveContext +from libertem_live.udf.monitor import ( + SignalMonitorUDF, PartitionMonitorUDF +) + +from libertem.udf.base import UDFResults, UDF +from libertem.common.async_utils import sync_to_async + +if typing.TYPE_CHECKING: + from libertem.common.executor import JobExecutor + + +class EncodedResult: + def __init__( + self, + compressed_data: memoryview, + bbox: typing.Tuple[int, int, int, int], + full_shape: typing.Tuple[int, int], + delta_shape: typing.Tuple[int, int], + dtype: str, + channel_name: str, + udf_name: str, + ): + self.compressed_data = compressed_data + self.bbox = bbox + self.full_shape = full_shape + self.delta_shape = delta_shape + self.dtype = dtype + self.channel_name = channel_name + self.udf_name = udf_name + + def is_empty(self): + return len(self.compressed_data) == 0 + + +@numba.njit(cache=True) +def get_bbox(arr) -> typing.Tuple[int, ...]: + xmin = arr.shape[1] + ymin = arr.shape[0] + xmax = 0 + ymax = 0 + + for y in range(arr.shape[0]): + for x in range(arr.shape[1]): + value = arr[y, x] + if abs(value) < 1e-8: + continue + # got a non-zero value, update indices + if x < xmin: + xmin = x + if x > xmax: + xmax = x + if y < ymin: + ymin = y + if y > ymax: + ymax = y + return int(ymin), int(ymax), int(xmin), int(xmax) + + +class SingleMaskUDF(ApplyMasksUDF): + def get_result_buffers(self): + dtype = np.result_type(self.meta.input_dtype, self.get_mask_dtype()) + return { + 'intensity': self.buffer( + kind='nav', extra_shape=(1,), dtype=dtype, where='device', use='internal', + ), + 'intensity_nav': self.buffer( + kind='nav', extra_shape=(), dtype=dtype, where='device', use='result', + ), + } + + def get_results(self): + # bummer: we can't reshape the data, as the extra_shape from the buffer + # will override our desired shape. so we have to use two result buffers + # instead: + return { + 'intensity_nav': self.results.intensity.reshape(self.meta.dataset_shape.nav), + } + + +class WSServer: + def __init__(self): + self.connect() + self.ws_connected = set() + self.parameters = { + 'cx': 516/2.0, + 'cy': 512/2.0, + 'ri': 200.0, + 'ro': 530.0, + } + self.udfs = self.get_udfs() + + def get_udfs(self): + cx = self.parameters['cx'] + cy = self.parameters['cy'] + ri = self.parameters['ri'] + ro = self.parameters['ro'] + + def _ring(): + return masks.ring( + centerX=cx, + centerY=cy, + imageSizeX=516, + imageSizeY=516, + radius=ro, + radius_inner=ri) + + mask_udf = SingleMaskUDF(mask_factories=[_ring]) + return OrderedDict({ + # "brightfield": SumSigUDF(), + "annular": mask_udf, + # "sum": SumUDF(), + # "monitor": SignalMonitorUDF(), + "monitor_partition": PartitionMonitorUDF(), + }) + + async def __call__(self, websocket: WebSocketServerProtocol): + await self.send_state_dump(websocket) + await self.client_loop(websocket) + + async def send_state_dump(self, websocket: WebSocketClientProtocol): + await websocket.send(json.dumps({ + 'event': 'UPDATE_PARAMS', + 'parameters': self.parameters, + })) + + def register_client(self, websocket): + self.ws_connected.add(websocket) + + def unregister_client(self, websocket): + self.ws_connected.remove(websocket) + + async def client_loop(self, websocket: WebSocketClientProtocol): + try: + self.register_client(websocket) + try: + await self.send_state_dump(websocket) + async for msg in websocket: + await self.handle_message(msg, websocket) + except websockets.exceptions.ConnectionClosedError: + await websocket.close() + finally: + self.unregister_client(websocket) + + async def handle_message(self, msg, websocket): + try: + msg = json.loads(msg) + # FIXME: hack to not require the 'event' "tag": + if 'event' not in msg or msg['event'] == 'UPDATE_PARAMS': + print(f"parameter update: {msg}") + self.parameters = msg['parameters'] + self.udfs = self.get_udfs() + # broadcast to all clients: + msg['event'] = 'UPDATE_PARAMS' + await self.broadcast(json.dumps(msg)) + except Exception as e: + print(e) + + async def broadcast(self, msg): + websockets.broadcast(self.ws_connected, msg) + + async def make_deltas(self, partial_results: UDFResults, previous_results: typing.Optional[UDFResults]) -> np.ndarray: + deltas = [] + udf_names = list(self.udfs.keys()) + for idx in range(len(partial_results.buffers)): + udf_name = udf_names[idx] + for channel_name in partial_results.buffers[idx].keys(): + data = partial_results.buffers[idx][channel_name].data + if previous_results is None: + data_previous = np.zeros_like(data) + else: + data_previous = previous_results.buffers[idx][channel_name].data + + delta = data - data_previous + deltas.append({ + 'delta': delta, + 'udf_name': udf_name, + 'channel_name': channel_name, + }) + return deltas + + async def encode_result(self, delta: np.ndarray, udf_name: str, channel_name: str) -> EncodedResult: + """ + Slice `delta` to its non-zero region and compress that. Returns the information + needed to reconstruct the the full result. + """ + nonzero_mask = ~np.isclose(0, delta) + + if np.count_nonzero(nonzero_mask) == 0: + print(f"zero-delta update, skipping") + # skip this update if it is all-zero + return EncodedResult( + compressed_data=memoryview(b""), + bbox=(0, 0, 0, 0), + full_shape=delta.shape, + delta_shape=(0, 0), + dtype=delta.dtype, + channel_name=channel_name, + udf_name=udf_name, + ) + + bbox = get_bbox(delta) + ymin, ymax, xmin, xmax = bbox + delta_for_blit = delta[ymin:ymax + 1, xmin:xmax + 1] + # print(delta_for_blit.shape, delta_for_blit, list(delta_for_blit[-1])) + + # FIXME: remove allocating copy - maybe copy into pre-allocated buffer instead? + # compressed = await sync_to_async(lambda: lz4.frame.compress(np.copy(delta_for_blit))) + compressed = await sync_to_async(lambda: bitshuffle.compress_lz4(np.copy(delta_for_blit))) + + return EncodedResult( + compressed_data=memoryview(compressed), + bbox=bbox, + full_shape=delta.shape, + delta_shape=delta_for_blit.shape, + dtype=delta_for_blit.dtype, + channel_name=channel_name, + udf_name=udf_name, + ) + + async def handle_pending_acquisition(self, pending) -> str: + acq_id = str(uuid.uuid4()) + await self.broadcast(json.dumps({ + "event": "ACQUISITION_STARTED", + "id": acq_id, + })) + return acq_id + + async def handle_acquisition_end(self, pending, acq_id: str): + await self.broadcast(json.dumps({ + "event": "ACQUISITION_STARTED", + "id": acq_id, + })) + + async def handle_acquisition_end(self, pending, acq_id: str): + await self.broadcast(json.dumps({ + "event": "ACQUISITION_ENDED", + "id": acq_id, + })) + + async def handle_partial_result( + self, + partial_results: UDFResults, + pending_aq, + acq_id: str, + previous_results: typing.Optional[UDFResults] + ): + deltas = await self.make_deltas(partial_results, previous_results) + + delta_results: typing.List[EncodedResult] = [] + for delta in deltas: + delta_results.append( + await self.encode_result( + delta['delta'], + delta['udf_name'], + delta['channel_name'] + ) + ) + await self.broadcast(json.dumps({ + "event": "RESULT", + "id": acq_id, + "channels": [ + { + "bbox": result.bbox, + "full_shape": result.full_shape, + "delta_shape": result.delta_shape, + "dtype": str(result.dtype), + "encoding": "bslz4", + "channel_name": result.channel_name, + "udf_name": result.udf_name, + } + for result in delta_results + ], + })) + for result in delta_results: + await self.broadcast(result.compressed_data) + + async def acquisition_loop(self): + min_delta = 0.05 + while True: + pending_aq = await sync_to_async(self.conn.wait_for_acquisition, timeout=10) + if pending_aq is None: + continue + try: + acq_id = await self.handle_pending_acquisition(pending_aq) + print(f"acquisition starting with id={acq_id}") + t0 = time.perf_counter() + previous_results = None + partial_results = None + aq = self.ctx.make_acquisition( + conn=self.conn, + pending_aq=pending_aq, + frames_per_partition=25, + ) + last_update = 0 + try: + udfs_only = list(self.udfs.values()) + async for partial_results in self.ctx.run_udf_iter(dataset=aq, udf=udfs_only, sync=False): + if time.time() - last_update > min_delta: + await self.handle_partial_result(partial_results, pending_aq, acq_id, previous_results) + previous_results = copy.deepcopy(partial_results) + last_update = time.time() + await self.handle_partial_result(partial_results, pending_aq, acq_id, previous_results) + except Exception as e: + import traceback + traceback.print_exc() + self.ctx.close() + self.conn.close() + self.connect() + previous_results = copy.deepcopy(partial_results) + finally: + await self.handle_acquisition_end(pending_aq, acq_id) + previous_results = None + t1 = time.perf_counter() + print(f"acquisition done with id={acq_id}; took {t1-t0:.3f}s") + + async def serve(self): + async with websockets.serve(self, "localhost", 8444): + try: + await self.acquisition_loop() + finally: + self.conn.close() + self.ctx.close() + + def connect(self): + executor = PipelinedExecutor( + spec=PipelinedExecutor.make_spec( + cpus=range(16), cudas=[] + ), + pin_workers=False, + # delayed_gc=False, + ) + ctx = LiveContext(executor=executor) + conn = ctx.make_connection('asi_mpx3').open( + data_host="localhost", + data_port=8283, + api_host="localhost", + api_port=8080, + buffer_size=2048, + ) + + self.conn = conn + self.executor = executor + self.ctx = ctx + +async def main(): + server = WSServer() + await server.serve() + + +if __name__ == "__main__": + + asyncio.run(main()) diff --git a/libertem_asi_mpx3/examples/simple.py b/libertem_asi_mpx3/examples/simple.py new file mode 100644 index 00000000..7f39bd89 --- /dev/null +++ b/libertem_asi_mpx3/examples/simple.py @@ -0,0 +1,60 @@ +import numpy as np +import tqdm +import libertem_asi_mpx3 + +conn = libertem_asi_mpx3.ServalConnection( + data_uri="localhost:8283", + api_uri="http://localhost:8080", + handle_path="/tmp/asi_mpx3_shm", + frame_stack_size=16, + num_slots=2000, + bytes_per_frame=512*512*2, + huge=False, +) + +conn.start_passive() + +cam_client = None + +try: + while True: + config = None + while config is None: + print("connecting...") + config = conn.wait_for_arm(10.0) + + assert config is not None + print(config) + + # any other process can use a `CamClient` to use data + # stored in the SHM: + cam_client = libertem_asi_mpx3.CamClient(conn.get_socket_path()) + + tq = tqdm.tqdm(total=config.get_n_triggers() * 512 * 512 * 2, unit='B', unit_scale=True, unit_divisor=1024) + + while True: + # get at most `max_size` frames as a stack + # (might get less at the end of the acquisition) + stack_handle = conn.get_next_stack(max_size=32) + + # if the receiver is idle, stack_handle will be None here: + if stack_handle is None: + break + + # the expected shape and data type: + frame_shape = tuple(stack_handle.get_shape()) + + frames = cam_client.get_frames(stack_handle) + + tq.update(len(frames) * 512 * 512 * 2) + + del frames # let's hope no-one else keeps a reference, as it will be invalid after `done` is called + + # free up the shared memory slot for this frame stack: + cam_client.done(stack_handle) + + tq.close() +finally: + conn.close() # clean up background thread etc. + if cam_client is not None: + cam_client.close() diff --git a/libertem_asi_mpx3/pyproject.toml b/libertem_asi_mpx3/pyproject.toml new file mode 100644 index 00000000..b3d80d80 --- /dev/null +++ b/libertem_asi_mpx3/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["maturin>=0.13,<0.14"] +build-backend = "maturin" + +[project] +name = "libertem-asi-mpx3" +requires-python = ">=3.7" +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ +] + +[project.urls] +repository = "https://github.com/LiberTEM/LiberTEM-rs/" + +[tool.maturin] +features = ["pyo3/extension-module"] diff --git a/libertem_asi_mpx3/rust-toolchain.toml b/libertem_asi_mpx3/rust-toolchain.toml new file mode 100644 index 00000000..292fe499 --- /dev/null +++ b/libertem_asi_mpx3/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "stable" diff --git a/libertem_asi_mpx3/src/cam_client.rs b/libertem_asi_mpx3/src/cam_client.rs new file mode 100644 index 00000000..faf79d54 --- /dev/null +++ b/libertem_asi_mpx3/src/cam_client.rs @@ -0,0 +1,87 @@ +use ipc_test::SharedSlabAllocator; +use log::trace; +use std::ffi::c_int; + +use crate::{common::DType, exceptions::ConnectionError, frame_stack::FrameStackHandle}; +use pyo3::{exceptions::PyRuntimeError, ffi::PyMemoryView_FromMemory, prelude::*, FromPyPointer}; + +#[pyclass] +pub struct CamClient { + shm: Option, +} + +#[allow(non_upper_case_globals)] +const PyBUF_READ: c_int = 0x100; + +impl CamClient { + fn get_memoryview(&self, py: Python, raw_data: &[u8]) -> PyObject { + let ptr = raw_data.as_ptr(); + let length = raw_data.len(); + + let mv = unsafe { + PyMemoryView_FromMemory(ptr as *mut i8, length.try_into().unwrap(), PyBUF_READ) + }; + let from_ptr: &PyAny = unsafe { FromPyPointer::from_owned_ptr(py, mv) }; + from_ptr.into_py(py) + } +} + +#[pymethods] +impl CamClient { + #[new] + fn new(handle_path: &str) -> PyResult { + match SharedSlabAllocator::connect(handle_path) { + Ok(shm) => Ok(CamClient { shm: Some(shm) }), + Err(e) => { + let msg = format!("failed to connect to SHM: {:?}", e); + Err(ConnectionError::new_err(msg)) + } + } + } + + fn get_frames( + &self, + handle: &FrameStackHandle, + py: Python, + ) -> PyResult> { + let slot: ipc_test::Slot = if let Some(shm) = &self.shm { + shm.get(handle.slot.slot_idx) + } else { + return Err(PyRuntimeError::new_err("can't decompress with closed SHM")); + }; + + Ok(handle + .get_meta() + .iter() + .zip(0..) + .map(|(meta, idx)| { + let image_data = handle.get_slice_for_frame(idx, &slot); + let memory_view = self.get_memoryview(py, image_data); + + (memory_view, meta.dtype.clone()) + }) + .collect()) + } + + fn done(mut slf: PyRefMut, handle: &FrameStackHandle) -> PyResult<()> { + let slot_idx = handle.slot.slot_idx; + if let Some(shm) = &mut slf.shm { + shm.free_idx(slot_idx); + Ok(()) + } else { + Err(PyRuntimeError::new_err( + "CamClient.done called with SHM closed", + )) + } + } + + fn close(&mut self) { + self.shm.take(); + } +} + +impl Drop for CamClient { + fn drop(&mut self) { + trace!("CamClient::drop"); + } +} diff --git a/libertem_asi_mpx3/src/common.rs b/libertem_asi_mpx3/src/common.rs new file mode 100644 index 00000000..f02593f0 --- /dev/null +++ b/libertem_asi_mpx3/src/common.rs @@ -0,0 +1,57 @@ +use pyo3::{pyclass, pymethods}; +use serde::{Deserialize, Serialize}; + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +#[pyclass] +pub enum DType { + U8, + U16, +} + +impl DType { + pub fn from_maxval(maxval: u32) -> Self { + if maxval < 256 { + DType::U8 + } else { + DType::U16 + } + } + + pub fn num_bytes(&self) -> usize { + match self { + DType::U8 => 1, + DType::U16 => 2, + } + } +} + +#[pymethods] +impl DType { + fn as_string(&self) -> String { + match self { + Self::U8 => "uint8".to_string(), + Self::U16 => "uint16".to_string(), + } + } +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub struct FrameMeta { + pub sequence: u64, + pub dtype: DType, + pub width: u16, + pub height: u16, + + /// The exact length of the data for this frame + pub data_length_bytes: usize, + + /// The length of the header in bytes + pub header_length_bytes: usize, +} + +impl FrameMeta { + /// Get the number of elements in this frame (`prod(shape)`) + pub fn get_size(&self) -> u64 { + self.width as u64 * self.height as u64 + } +} diff --git a/libertem_asi_mpx3/src/exceptions.rs b/libertem_asi_mpx3/src/exceptions.rs new file mode 100644 index 00000000..546af860 --- /dev/null +++ b/libertem_asi_mpx3/src/exceptions.rs @@ -0,0 +1,15 @@ +use pyo3::{create_exception, exceptions}; + +create_exception!( + libertem_dectris, + TimeoutError, + exceptions::PyException, + "Timeout while communicating" +); + +create_exception!( + libertem_dectris, + ConnectionError, + exceptions::PyException, + "SHM Connection failed" +); diff --git a/libertem_asi_mpx3/src/frame_stack.rs b/libertem_asi_mpx3/src/frame_stack.rs new file mode 100644 index 00000000..ba03bb7e --- /dev/null +++ b/libertem_asi_mpx3/src/frame_stack.rs @@ -0,0 +1,235 @@ +use bincode::serialize; +use ipc_test::{SharedSlabAllocator, SlotForWriting, SlotInfo}; +use pyo3::{ + exceptions::{PyRuntimeError, PyValueError}, + prelude::*, + types::{PyBytes, PyType}, +}; +use serde::{Deserialize, Serialize}; + +use crate::common::FrameMeta; + +pub struct FrameStackForWriting { + slot: SlotForWriting, + meta: Vec, + + /// where in the slot do the frames begin? this can be unevenly spaced + offsets: Vec, + + /// offset where the next frame will be written + pub(crate) cursor: usize, + + /// number of bytes reserved for each frame + /// as some frames compress better or worse, this is just the "planning" number + bytes_per_frame: usize, +} + +impl FrameStackForWriting { + pub fn new(slot: SlotForWriting, capacity: usize, bytes_per_frame: usize) -> Self { + FrameStackForWriting { + slot, + cursor: 0, + bytes_per_frame, + // reserve a bit more, as we don't know the upper bound of frames + // per stack and using a bit more memory is better than having to + // resize the vectors + meta: Vec::with_capacity(2 * capacity), + offsets: Vec::with_capacity(2 * capacity), + } + } + + pub fn len(&self) -> usize { + self.meta.len() + } + + pub fn can_fit(&self, num_bytes: usize) -> bool { + self.slot.size - self.cursor >= num_bytes + } + + pub fn slot_size(&self) -> usize { + self.slot.size + } + + pub fn bytes_free(&self) -> usize { + self.slot.size - self.cursor + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Temporarily take mutable ownership of a buffer + /// for a single frame, and receive the data into it. + pub fn write_frame( + &mut self, + meta: &FrameMeta, + mut fill_buffer: impl FnMut(&mut [u8]) -> Result<(), E>, + ) -> Result<(), E> { + // FIXME: `fill_buffer` should return a `Result` + let start = self.cursor; + let stop = start + meta.data_length_bytes; + let dest = &mut self.slot.as_slice_mut()[start..stop]; + fill_buffer(dest)?; + + self.meta.push(meta.clone()); + self.offsets.push(self.cursor); + self.cursor += meta.data_length_bytes; + + Ok(()) + } + + pub fn writing_done(self, shm: &mut SharedSlabAllocator) -> FrameStackHandle { + let slot_info = shm.writing_done(self.slot); + + FrameStackHandle { + slot: slot_info, + meta: self.meta, + offsets: self.offsets, + bytes_per_frame: self.bytes_per_frame, + } + } +} + +/// serializable handle for a stack of frames that live in shm +#[pyclass] +#[derive(PartialEq, Eq, Serialize, Deserialize, Debug)] +pub struct FrameStackHandle { + pub(crate) slot: SlotInfo, + meta: Vec, + pub(crate) offsets: Vec, + pub(crate) bytes_per_frame: usize, +} + +impl FrameStackHandle { + pub fn len(&self) -> usize { + self.meta.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// total number of _useful_ bytes in this frame stack + pub fn payload_size(&self) -> usize { + self.meta.iter().map(|fm| fm.data_length_bytes).sum() + } + + /// total number of bytes allocated for the slot + pub fn slot_size(&self) -> usize { + self.slot.size + } + + pub fn get_meta(&self) -> &Vec { + &self.meta + } + + pub(crate) fn deserialize_impl(serialized: &PyBytes) -> PyResult { + let data = serialized.as_bytes(); + bincode::deserialize(data).map_err(|e| { + let msg = format!("could not deserialize FrameStackHandle: {e:?}"); + PyRuntimeError::new_err(msg) + }) + } + + pub fn get_slice_for_frame<'a>(&'a self, frame_idx: usize, slot: &'a ipc_test::Slot) -> &[u8] { + let slice = slot.as_slice(); + let in_offset = self.offsets[frame_idx]; + let size = self.meta[frame_idx].data_length_bytes; + &slice[in_offset..in_offset + size] + } + + /// Split self at `mid` and create two new `FrameStackHandle`s. + /// The first will contain frames with indices [0..mid), the second [mid..len)` + pub fn split_at(self, mid: usize, shm: &mut SharedSlabAllocator) -> (Self, Self) { + // FIXME: this whole thing is falliable, so modify return type to Result<> (or PyResult<>?) + let bytes_mid = self.offsets[mid]; + let (left, right) = { + let slot: ipc_test::Slot = shm.get(self.slot.slot_idx); + let slice = slot.as_slice(); + + let mut slot_left = shm.get_mut().expect("shm slot for writing"); + let slice_left = slot_left.as_slice_mut(); + + let mut slot_right = shm.get_mut().expect("shm slot for writing"); + let slice_right = slot_right.as_slice_mut(); + + slice_left[..bytes_mid].copy_from_slice(&slice[..bytes_mid]); + slice_right[..(slice.len() - bytes_mid)].copy_from_slice(&slice[bytes_mid..]); + + let left = shm.writing_done(slot_left); + let right = shm.writing_done(slot_right); + + shm.free_idx(self.slot.slot_idx); + + (left, right) + }; + + let (left_meta, right_meta) = self.meta.split_at(mid); + let (left_offsets, right_offsets) = self.offsets.split_at(mid); + + ( + FrameStackHandle { + slot: left, + meta: left_meta.to_vec(), + offsets: left_offsets.to_vec(), + bytes_per_frame: self.bytes_per_frame, + }, + FrameStackHandle { + slot: right, + meta: right_meta.to_vec(), + offsets: right_offsets.iter().map(|o| o - bytes_mid).collect(), + bytes_per_frame: self.bytes_per_frame, + }, + ) + } + + fn first_meta(&self) -> PyResult<&FrameMeta> { + self.meta.first().map_or_else( + || Err(PyValueError::new_err("empty frame stack".to_string())), + Ok, + ) + } +} + +#[pymethods] +impl FrameStackHandle { + pub fn serialize(&self, py: Python) -> PyResult> { + let bytes: &PyBytes = PyBytes::new(py, serialize(self).unwrap().as_slice()); + Ok(bytes.into()) + } + + #[classmethod] + fn deserialize(_cls: &PyType, serialized: &PyBytes) -> PyResult { + Self::deserialize_impl(serialized) + } + + fn get_frame_id(slf: PyRef) -> PyResult { + Ok(slf.first_meta()?.sequence) + } + + fn get_shape(slf: PyRef) -> PyResult<(u16, u16)> { + let meta = slf.first_meta()?; + Ok((meta.height, meta.width)) + } + + fn __len__(slf: PyRef) -> usize { + slf.len() + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::FrameStackForWriting; + use ipc_test::{SharedSlabAllocator, Slot}; + use tempfile::{tempdir, TempDir}; + + fn get_socket_path() -> (TempDir, PathBuf) { + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.path().join("stuff.socket"); + + (socket_dir, socket_as_path) + } +} diff --git a/libertem_asi_mpx3/src/lib.rs b/libertem_asi_mpx3/src/lib.rs new file mode 100644 index 00000000..df7da9b8 --- /dev/null +++ b/libertem_asi_mpx3/src/lib.rs @@ -0,0 +1,7 @@ +pub mod cam_client; +pub mod common; +pub mod exceptions; +pub mod frame_stack; +pub mod main_py; +pub mod receiver; +pub mod stats; diff --git a/libertem_asi_mpx3/src/main_py.rs b/libertem_asi_mpx3/src/main_py.rs new file mode 100644 index 00000000..3f50d5f3 --- /dev/null +++ b/libertem_asi_mpx3/src/main_py.rs @@ -0,0 +1,423 @@ +#![allow(clippy::borrow_deref_ref)] + +use std::{ + convert::Infallible, + path::PathBuf, + time::{Duration, Instant}, +}; + +use crate::{ + cam_client::CamClient, + common::{DType, FrameMeta}, + exceptions::{ConnectionError, TimeoutError}, + frame_stack::FrameStackHandle, + receiver::{ReceiverStatus, ResultMsg, ServalReceiver}, +}; + +use ipc_test::SharedSlabAllocator; +use log::{info, trace}; +use pyo3::{ + exceptions::{self, PyRuntimeError}, + prelude::*, +}; +use serval_client::{DetectorConfig, DetectorInfo, DetectorLayout, ServalClient}; +use stats::Stats; + +#[pymodule] +fn libertem_asi_mpx3(py: Python, m: &PyModule) -> PyResult<()> { + // FIXME: logging integration deadlocks on close(), when trying to acquire + // the GIL + // pyo3_log::init(); + + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add("TimeoutError", py.get_type::())?; + + register_header_module(py, m)?; + + let env = env_logger::Env::default() + .filter_or("LIBERTEM_ASI_LOG_LEVEL", "error") + .write_style_or("LIBERTEM_ASI_LOG_STYLE", "always"); + env_logger::Builder::from_env(env) + .format_timestamp_micros() + .init(); + + Ok(()) +} + +fn register_header_module(py: Python<'_>, parent_module: &PyModule) -> PyResult<()> { + let headers_module = PyModule::new(py, "headers")?; + parent_module.add_submodule(headers_module)?; + Ok(()) +} + +#[derive(Debug)] +#[pyclass(name = "DetectorConfig")] +struct PyDetectorConfig { + config: DetectorConfig, +} + +#[pymethods] +impl PyDetectorConfig { + fn __repr__(&self) -> String { + format!("{:?}", self) + } + + fn get_n_triggers(&self) -> u64 { + self.config.n_triggers + } +} + +#[derive(Debug)] +#[pyclass(name = "DetectorInfo")] +struct PyDetectorInfo { + info: DetectorInfo, +} + +#[pymethods] +impl PyDetectorInfo { + fn __repr__(&self) -> String { + format!("{:?}", self) + } + + fn get_pix_count(&self) -> u64 { + self.info.pix_count + } +} + +#[derive(Debug)] +#[pyclass(name = "DetectorLayout")] +struct PyDetectorLayout { + info: DetectorLayout, +} + +#[pymethods] +impl PyDetectorLayout { + fn __repr__(&self) -> String { + format!("{:?}", self) + } +} + +#[pyclass(name = "ServalAPIClient")] +struct PyServalClient { + client: ServalClient, + base_url: String, +} + +#[pymethods] +impl PyServalClient { + #[new] + fn new(base_url: &str) -> Self { + Self { + client: ServalClient::new(base_url), + base_url: base_url.to_string(), + } + } + + fn __repr__(&self) -> String { + format!("", self.base_url) + } + + fn get_detector_config(&self) -> PyResult { + self.client + .get_detector_config() + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + .map(|value| PyDetectorConfig { config: value }) + } + + fn get_detector_info(&self) -> PyResult { + self.client + .get_detector_info() + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + .map(|value| PyDetectorInfo { info: value }) + } +} + +struct FrameChunkedIterator<'a, 'b, 'c, 'd> { + receiver: &'a mut ServalReceiver, + shm: &'b mut SharedSlabAllocator, + remainder: &'c mut Vec, + stats: &'d mut Stats, +} + +impl<'a, 'b, 'c, 'd> FrameChunkedIterator<'a, 'b, 'c, 'd> { + /// Get the next frame stack. Mainly handles splitting logic for boundary + /// conditions and delegates communication with the background thread to `recv_next_stack_impl` + pub fn get_next_stack_impl( + &mut self, + py: Python, + max_size: usize, + ) -> PyResult> { + let res = self.recv_next_stack_impl(py); + match res { + Ok(Some(frame_stack)) => { + if frame_stack.len() > max_size { + // split `FrameStackHandle` into two: + trace!( + "FrameStackHandle::split_at({max_size}); len={}", + frame_stack.len() + ); + self.stats.count_split(); + let (left, right) = frame_stack.split_at(max_size, self.shm); + self.remainder.push(right); + assert!(left.len() <= max_size); + return Ok(Some(left)); + } + assert!(frame_stack.len() <= max_size); + Ok(Some(frame_stack)) + } + Ok(None) => Ok(None), + e @ Err(_) => e, + } + } + + /// Receive the next frame stack from the background thread and handle any + /// other control messages. + fn recv_next_stack_impl(&mut self, py: Python) -> PyResult> { + // first, check if there is anything on the remainder list: + if let Some(frame_stack) = self.remainder.pop() { + return Ok(Some(frame_stack)); + } + + match self.receiver.status { + ReceiverStatus::Closed => { + return Err(exceptions::PyRuntimeError::new_err("receiver is closed")) + } + ReceiverStatus::Idle => return Ok(None), + ReceiverStatus::Running => {} + } + + let recv = &mut self.receiver; + + loop { + py.check_signals()?; + + let recv_result = py.allow_threads(|| { + let next: Result, Infallible> = + Ok(recv.next_timeout(Duration::from_millis(100))); + next + })?; + + match recv_result { + None => { + continue; + } + Some(ResultMsg::ParseError { msg }) => { + return Err(exceptions::PyRuntimeError::new_err(msg)) + } + Some(ResultMsg::AcquisitionStart { + detector_config: _, + first_frame_meta: _, + }) => { + // FIXME: in case of "passive" mode, we should actually not hit this, + // as the "outer" structure (`ServalConnection`) handles it? + continue; + } + Some(ResultMsg::Error { msg }) => { + return Err(exceptions::PyRuntimeError::new_err(msg)) + } + Some(ResultMsg::End { frame_stack }) => { + self.stats.log_stats(); + self.stats.reset(); + return Ok(Some(frame_stack)); + } + Some(ResultMsg::FrameStack { frame_stack }) => { + return Ok(Some(frame_stack)); + } + } + } + } + + fn new( + receiver: &'a mut ServalReceiver, + shm: &'b mut SharedSlabAllocator, + remainder: &'c mut Vec, + stats: &'d mut Stats, + ) -> PyResult { + Ok(FrameChunkedIterator { + receiver, + shm, + remainder, + stats, + }) + } +} + +#[pyclass] +struct ServalConnection { + receiver: ServalReceiver, + remainder: Vec, + local_shm: SharedSlabAllocator, + stats: Stats, +} + +impl ServalConnection { + fn start_passive_impl(&mut self) -> PyResult<()> { + self.receiver + .start_passive() + .map_err(|err| exceptions::PyRuntimeError::new_err(err.msg)) + } + + fn close_impl(&mut self) { + self.receiver.close(); + } +} + +#[pyclass] +struct PendingAcquisition { + config: DetectorConfig, + first_frame_meta: FrameMeta, +} + +#[pymethods] +impl PendingAcquisition { + fn get_detector_config(&self) -> PyDetectorConfig { + PyDetectorConfig { + config: self.config.clone(), + } + } + + fn get_frame_width(&self) -> u16 { + self.first_frame_meta.width + } + + fn get_frame_height(&self) -> u16 { + self.first_frame_meta.height + } +} + +#[pymethods] +impl ServalConnection { + #[new] + fn new( + data_uri: &str, + api_uri: &str, + frame_stack_size: usize, + handle_path: String, + num_slots: Option, + bytes_per_frame: Option, + huge: Option, + ) -> PyResult { + let num_slots = num_slots.map_or_else(|| 2000, |x| x); + let bytes_per_frame = bytes_per_frame.map_or_else(|| 512 * 512 * 2, |x| x); + let slot_size = frame_stack_size * bytes_per_frame; + let shm = match SharedSlabAllocator::new( + num_slots, + slot_size, + huge.map_or_else(|| false, |x| x), + &PathBuf::from(handle_path), + ) { + Ok(shm) => shm, + Err(e) => { + let total_size = num_slots * slot_size; + let msg = format!("could not create SHM area (num_slots={num_slots}, slot_size={slot_size} total_size={total_size} huge={huge:?}): {e:?}"); + return Err(ConnectionError::new_err(msg)); + } + }; + + let local_shm = shm.clone_and_connect().expect("clone SHM"); + + Ok(Self { + receiver: ServalReceiver::new(data_uri, api_uri, frame_stack_size, shm), + remainder: Vec::new(), + local_shm, + stats: Stats::new(), + }) + } + + /// Wait until the detector is armed, or until the timeout expires (in seconds) + /// Returns `None` in case of timeout, the detector config otherwise. + /// This method drops the GIL to allow concurrent Python threads. + fn wait_for_arm(&mut self, timeout: f32, py: Python) -> PyResult> { + let timeout = Duration::from_secs_f32(timeout); + let deadline = Instant::now() + timeout; + let step = Duration::from_millis(100); + + loop { + py.check_signals()?; + + let res = py.allow_threads(|| { + let timeout_rem = deadline - Instant::now(); + let this_timeout = timeout_rem.min(step); + self.receiver.next_timeout(this_timeout) + }); + + match res { + Some(ResultMsg::AcquisitionStart { + detector_config, + first_frame_meta, + }) => { + return Ok(Some(PendingAcquisition { + config: detector_config, + first_frame_meta, + })) + } + msg @ Some(ResultMsg::End { .. }) | msg @ Some(ResultMsg::FrameStack { .. }) => { + let err = format!("unexpected message: {:?}", msg); + return Err(PyRuntimeError::new_err(err)); + } + Some(ResultMsg::ParseError { msg }) => return Err(PyRuntimeError::new_err(msg)), + Some(ResultMsg::Error { msg }) => return Err(PyRuntimeError::new_err(msg)), + None => { + // timeout + if Instant::now() > deadline { + return Ok(None); + } else { + continue; + } + } + } + } + } + + fn get_socket_path(&self) -> PyResult { + Ok(self.local_shm.get_handle().os_handle) + } + + fn is_running(slf: PyRef) -> bool { + slf.receiver.status == ReceiverStatus::Running + } + + /// Start listening for global acquisition headers on the zeromq socket. + /// Call `wait_for_arm` to wait + fn start_passive(mut slf: PyRefMut) -> PyResult<()> { + slf.start_passive_impl() + } + + fn close(mut slf: PyRefMut) { + slf.stats.log_stats(); + slf.stats.reset(); + slf.close_impl(); + } + + fn get_next_stack( + &mut self, + py: Python, + max_size: usize, + ) -> PyResult> { + let mut iter = FrameChunkedIterator::new( + &mut self.receiver, + &mut self.local_shm, + &mut self.remainder, + &mut self.stats, + )?; + iter.get_next_stack_impl(py, max_size).map(|maybe_stack| { + if let Some(frame_stack) = &maybe_stack { + self.stats.count_stats_item(frame_stack); + } + maybe_stack + }) + } + + fn log_shm_stats(&self) { + let free = self.local_shm.num_slots_free(); + let total = self.local_shm.num_slots_total(); + self.stats.log_stats(); + info!("shm stats free/total: {}/{}", free, total); + } +} diff --git a/libertem_asi_mpx3/src/receiver.rs b/libertem_asi_mpx3/src/receiver.rs new file mode 100644 index 00000000..ea66408a --- /dev/null +++ b/libertem_asi_mpx3/src/receiver.rs @@ -0,0 +1,741 @@ +use std::{ + fmt::{Debug, Display}, + io::{ErrorKind, Read}, + mem::replace, + net::TcpStream, + str::FromStr, + thread::JoinHandle, + time::{Duration, Instant}, +}; + +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}; +use ipc_test::{SHMHandle, SharedSlabAllocator}; +use log::{debug, error, info, trace, warn}; +use serval_client::{DetectorConfig, ServalClient, ServalError}; + +use crate::{ + common::{DType, FrameMeta}, + frame_stack::{FrameStackForWriting, FrameStackHandle}, +}; + +#[derive(PartialEq, Debug)] +pub enum ResultMsg { + Error { + msg: String, + }, // generic error response, might need to specialize later + + /// The frame header failed to parse + ParseError { + msg: String, + }, + + AcquisitionStart { + detector_config: DetectorConfig, + first_frame_meta: FrameMeta, + }, + + /// A stack of frames, part of an acquisition + FrameStack { + frame_stack: FrameStackHandle, + }, + + /// The last stack of frames of an acquisition + /// (can possibly be empty!) + End { + frame_stack: FrameStackHandle, + }, +} + +#[derive(Debug)] +enum ParseError { + WrongMagic { got: [u8; 2] }, + Eof, + InvalidMaxVal { got: u32 }, + WhiteSpaceExpected { pos: usize, got: u8 }, +} + +pub enum ControlMsg { + StopThread, + + /// Wait for any acquisition to start on a given host/port + StartAcquisitionPassive, +} + +#[derive(PartialEq, Eq)] +pub enum ReceiverStatus { + Idle, + Running, + Closed, +} + +// FIXME: error return type +fn num_from_byte_slice(bytes: &[u8]) -> T +where + ::Err: Debug, +{ + // This should not become a bottleneck, but in case it does, + // there is the `atoi` crate, which provides this functionality + // without going via UTF8 first. + let s = std::str::from_utf8(bytes).unwrap(); + s.parse().unwrap() +} + +/// Peek and parse the first frame header +fn peek_header(stream: &mut TcpStream) -> Result { + let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE]; + // FIXME: error handling, timeout, ... + + let mut nbytes = 0; + + // Ugh.. wait until enough data is in the buffer + // possibly, the sender sends the header and payload separately, + // in which case we get only a short header, and we need to retry. + // All because we don't really know how large the header is supposed to be. + // This is broken for very small frames (where header+data < 512), + // so if an acquisition only contains <512 bytes in total, we will wait + // here indefinitely. + while nbytes < HEADER_BUF_SIZE { + nbytes = match stream.peek(&mut buf) { + Ok(n) => n, + Err(e) => return Err(AcquisitionError::ConnectionError { msg: e.to_string() }), + } + // FIXME: timeout!! + } + + Ok(parse_header(&buf, 0)?) +} + +const HEADER_BUF_SIZE: usize = 512; + +fn parse_header(buf: &[u8; HEADER_BUF_SIZE], sequence: u64) -> Result { + let mut pos: usize = 0; + + // Each PGM image consists of the following: + // • A "magic number" for identifying the file type. A pgm image's magic number is the two characters "P5". + // FIXME: error handling + if &buf[0..2] != b"P5" { + return Err(ParseError::WrongMagic { + got: [buf[0], buf[1]], + }); + } + + pos += 2; + + // • Whitespace (blanks, TABs, CRs, LFs). + while buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + + // • A width, formatted as ASCII characters in decimal. + let width_start = pos; + while !buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + let width: u16 = num_from_byte_slice(&buf[width_start..pos]); + + // • Whitespace. + while buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + + // • A height, again in ASCII decimal. + let height_start = pos; + while !buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + let height: u16 = num_from_byte_slice(&buf[height_start..pos]); + + // • Whitespace. + while buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + + // • The maximum gray value (Maxval), again in ASCII decimal. Must be less than 65536, and more than zero. + // really, more than zero? how do you represent an all-black image in PGM then? + let maxval_start = pos; + while !buf[pos].is_ascii_whitespace() && pos < HEADER_BUF_SIZE { + pos += 1; + } + let maxval: u32 = num_from_byte_slice(&buf[maxval_start..pos]); + + if !(0..65536).contains(&maxval) { + return Err(ParseError::InvalidMaxVal { got: maxval }); + } + + // • A single whitespace character (usually a newline). + if !buf[pos].is_ascii_whitespace() { + return Err(ParseError::WhiteSpaceExpected { pos, got: buf[pos] }); + } + + // • A raster of Height rows, [...] + let raster_start_pos = pos; + + let dtype: DType = DType::from_maxval(maxval); + let data_length_bytes = width as usize * height as usize * dtype.num_bytes(); + + let header_length_bytes: usize = pos + 1; + + let meta = FrameMeta { + sequence, + dtype, + width, + height, + data_length_bytes, + header_length_bytes, + }; + + trace!("frame header parsed: {meta:?}"); + + Ok(meta) +} + +/// Puts `new` into `right`, `right` into `left` and returns the old `left` +fn three_way_shift(left: &mut T, right: &mut T, new: T) -> T { + let old_right = replace(right, new); + replace(left, old_right) +} + +fn recv_frame( + sequence: u64, + stream: &mut TcpStream, + control_channel: &Receiver, + frame_stack: &mut FrameStackForWriting, + extra_frame_stack: &mut FrameStackForWriting, +) -> Result { + // TODO: + // - timeout handling + // - error handling (parsing, receiving) + + // 1) Read the first N bytes (512?) from the socket into a buffer, and parse the PGM header + + let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE]; + + // FIXME: need to have a timeout here! + // In the happy case, this succeeds, or we get a + // ConnectionReset/ConnectionAborted, but in case the network inbetween is + // going bad, we might block here indefinitely. But we must regularly check + // for control messages from the `control_channel`, which we can't do here + // like this. + match stream.read_exact(&mut buf) { + Ok(_) => {} + Err(e) => { + // any kind of connection error means something is gone bad + return Err(AcquisitionError::ConnectionError { msg: e.to_string() }); + } + } + + // 2) Parse the header, importantly reading width, height, bytes-per-pixel (maxval) + + let meta = parse_header(&buf, sequence)?; + + // 3) Now we know how large the binary part is. Copy the rest of the buffer + // into the frame stack, and read the remaining bytes directly into the + // shared memory. + + let fs = if frame_stack.can_fit(meta.data_length_bytes) { + frame_stack + } else { + trace!( + "frame_stack can't fit this frame: {} {}", + frame_stack.bytes_free(), + meta.data_length_bytes + ); + if !extra_frame_stack.is_empty() { + return Err(AcquisitionError::StateError { + msg: "extra_frame_stack should be empty".to_string(), + }); + } + if !extra_frame_stack.can_fit(meta.data_length_bytes) { + return Err(AcquisitionError::ConfigurationError { + msg: format!( + "extra_frame_stack can't fit frame; frame size {}, frame stack size {}", + meta.data_length_bytes, + extra_frame_stack.slot_size() + ), + }); + } + extra_frame_stack + }; + + fs.write_frame(&meta, |dest_buf| { + // copy the data after the header from our temporary stack buffer: + let head_src = &buf[meta.header_length_bytes..]; + dest_buf[0..head_src.len()].copy_from_slice(head_src); + + let dest_rest = &mut dest_buf[head_src.len()..]; + + // FIXME: this blocks - we need to check for control messages every now and then + match stream.read_exact(dest_rest) { + Ok(_) => Ok(()), + Err(e) => Err(AcquisitionError::ConnectionError { msg: e.to_string() }), + } + })?; + + Ok(meta) +} + +#[derive(Debug, Clone)] +enum AcquisitionError { + Disconnected, + Cancelled, + BufferFull, + StateError { msg: String }, + ConfigurationError { msg: String }, + ParseError { msg: String }, + ConnectionError { msg: String }, + APIError { msg: String }, +} + +impl Display for AcquisitionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AcquisitionError::Cancelled => { + write!(f, "acquisition cancelled") + } + AcquisitionError::Disconnected => { + write!(f, "other end has disconnected") + } + AcquisitionError::BufferFull => { + write!(f, "shm buffer is full") + } + AcquisitionError::StateError { msg } => { + write!(f, "state error: {msg}") + } + AcquisitionError::ConfigurationError { msg } => { + write!(f, "configuration error: {msg}") + } + AcquisitionError::ParseError { msg } => { + write!(f, "parse error: {msg}") + } + AcquisitionError::ConnectionError { msg } => { + write!(f, "connection error: {msg}") + } + AcquisitionError::APIError { msg } => { + write!(f, "serval HTTP API error: {msg}") + } + } + } +} + +impl From for AcquisitionError { + fn from(value: ParseError) -> Self { + AcquisitionError::ParseError { + msg: format!("{:?}", value), + } + } +} + +impl From> for AcquisitionError { + fn from(_value: SendError) -> Self { + AcquisitionError::Disconnected + } +} + +impl From for AcquisitionError { + fn from(value: ServalError) -> Self { + Self::APIError { + msg: value.to_string(), + } + } +} + +/// With a running acquisition, check for control messages; +/// especially convert `ControlMsg::StopThread` to `AcquisitionError::Cancelled`. +fn check_for_control(control_channel: &Receiver) -> Result<(), AcquisitionError> { + match control_channel.try_recv() { + Ok(ControlMsg::StartAcquisitionPassive) => Err(AcquisitionError::StateError { + msg: "received StartAcquisitionPassive while an acquisition was already running" + .to_string(), + }), + Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled), + Err(TryRecvError::Disconnected) => Err(AcquisitionError::Cancelled), + Err(TryRecvError::Empty) => Ok(()), + } +} + +/// Passively listen for the start of an acquisition +/// and automatically latch on to it. +fn passive_acquisition( + control_channel: &Receiver, + from_thread_s: &Sender, + frame_stack_size: usize, + data_uri: &str, + api_uri: &str, + shm: &mut SharedSlabAllocator, +) -> Result<(), AcquisitionError> { + let client = ServalClient::new(api_uri); + + loop { + trace!("connecting to {data_uri}..."); + check_for_control(control_channel)?; + let mut stream: TcpStream = match TcpStream::connect(data_uri) { + Ok(s) => s, + Err(e) => match e.kind() { + ErrorKind::ConnectionRefused + | ErrorKind::TimedOut + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset => { + // If we re-connect too fast after an acquisition, the + // connection might succeed and then be closed from the + // other end. That's why we have to handle Connection{Aborted,Reset} + // here. + std::thread::sleep(Duration::from_millis(10)); + continue; + } + _ => return Err(AcquisitionError::ConnectionError { msg: e.to_string() }), + }, + }; + + // block until we get the first frame: + let first_frame_meta = match peek_header(&mut stream) { + Ok(m) => m, + Err(AcquisitionError::ConnectionError { msg }) => { + warn!("connection error while peeking first frame: {msg}; reconnecting"); + continue; + } + Err(e) => return Err(e), + }; + + // then, we should be able to reliably get the detector config + // (we assume once data arrives, the config is immutable) + let detector_config = client.get_detector_config()?; + + acquisition( + control_channel, + from_thread_s, + &detector_config, + &first_frame_meta, + &mut stream, + frame_stack_size, + shm, + )?; + + let free = shm.num_slots_free(); + let total = shm.num_slots_total(); + info!("passive acquisition done; free slots: {}/{}", free, total); + + check_for_control(control_channel)?; + } +} + +fn acquisition( + to_thread_r: &Receiver, + from_thread_s: &Sender, + detector_config: &DetectorConfig, + first_frame_meta: &FrameMeta, + stream: &mut TcpStream, + frame_stack_size: usize, + shm: &mut SharedSlabAllocator, +) -> Result<(), AcquisitionError> { + let t0 = Instant::now(); + let mut last_control_check = Instant::now(); + + from_thread_s.send(ResultMsg::AcquisitionStart { + detector_config: detector_config.clone(), + first_frame_meta: first_frame_meta.clone(), + })?; + + debug!("acquisition starting"); + + // approx uppper bound of image size in bytes + let peek_meta = peek_header(stream)?; + let approx_size_bytes = 2 * peek_meta.get_size(); + + let slot = match shm.get_mut() { + None => return Err(AcquisitionError::BufferFull), + Some(x) => x, + }; + let mut frame_stack = + FrameStackForWriting::new(slot, frame_stack_size, approx_size_bytes as usize); + + // in case the frame stack is full, the receiving function needs + // an alternative destination: + let extra_slot = match shm.get_mut() { + None => return Err(AcquisitionError::BufferFull), + Some(x) => x, + }; + let mut extra_frame_stack = + FrameStackForWriting::new(extra_slot, frame_stack_size, approx_size_bytes as usize); + + debug!("starting receive loop"); + + let mut sequence = 0; + + loop { + if last_control_check.elapsed() > Duration::from_millis(300) { + last_control_check = Instant::now(); + check_for_control(to_thread_r)?; + trace!("acquisition progress: sequence={sequence}"); + } + + recv_frame( + sequence, + stream, + to_thread_r, + &mut frame_stack, + &mut extra_frame_stack, + )?; + sequence += 1; + + // If `recv_frame` had to use `extra_frame_stack`, `frame_stack` is + // finished and we need to exchange the stacks: + if !extra_frame_stack.is_empty() { + trace!("got something in `extra_frame_stack`, swapping things around..."); + // approx. the following is happening here: + // 1) to_send <- frame_stack + // 2) frame_stack <- extra_frame_stack + // 3) extra_frame_stack <- new_frame_stack() + + let to_send = { + let slot = match shm.get_mut() { + None => return Err(AcquisitionError::BufferFull), + Some(x) => x, + }; + let new_frame_stack = + FrameStackForWriting::new(slot, frame_stack_size, approx_size_bytes as usize); + + let old_frame_stack = + three_way_shift(&mut frame_stack, &mut extra_frame_stack, new_frame_stack); + old_frame_stack.writing_done(shm) + }; + // send to our queue: + from_thread_s.send(ResultMsg::FrameStack { + frame_stack: to_send, + })?; + } + + // we will be done after this frame: + let done = sequence == detector_config.n_triggers; + + if done { + let elapsed = t0.elapsed(); + info!("done in {elapsed:?}"); + + let handle = frame_stack.writing_done(shm); + from_thread_s.send(ResultMsg::End { + frame_stack: handle, + })?; + + if !extra_frame_stack.is_empty() { + let handle = extra_frame_stack.writing_done(shm); + from_thread_s.send(ResultMsg::End { + frame_stack: handle, + })?; + } else { + // let's not leak the `extra_frame_stack`: + // FIXME: `FrameStackForWriting` should really free itself, + // if `writing_done` was not called manually, which might happen + // in case of error handling. + // ah, but it can't, because it doesn't have a reference to `shm`! hmm + let handle = extra_frame_stack.writing_done(shm); + shm.free_idx(handle.slot.slot_idx); + } + + return Ok(()); + } + } +} + +/// convert `AcquisitionError`s to messages on `from_threads_s` +fn background_thread_wrap( + to_thread_r: &Receiver, + from_thread_s: &Sender, + data_uri: &str, + api_uri: &str, + frame_stack_size: usize, + shm: SharedSlabAllocator, +) { + if let Err(err) = background_thread( + to_thread_r, + from_thread_s, + data_uri, + api_uri, + frame_stack_size, + shm, + ) { + log::error!("background_thread err'd: {}", err.to_string()); + // NOTE: `shm` is dropped in case of an error, so anyone who tries to connect afterwards + // will get an error + from_thread_s + .send(ResultMsg::Error { + msg: err.to_string(), + }) + .unwrap(); + } +} + +fn background_thread( + to_thread_r: &Receiver, + from_thread_s: &Sender, + data_uri: &str, + api_uri: &str, + frame_stack_size: usize, + mut shm: SharedSlabAllocator, +) -> Result<(), AcquisitionError> { + 'outer: loop { + loop { + // control: main threads tells us to quit + let control = to_thread_r.recv_timeout(Duration::from_millis(100)); + match control { + Ok(ControlMsg::StartAcquisitionPassive) => { + match passive_acquisition( + to_thread_r, + from_thread_s, + frame_stack_size, + data_uri, + api_uri, + &mut shm, + ) { + Ok(_) => {} + Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + return Ok(()); + } + Err(e) => { + let msg = format!("passive_acquisition error: {}", e); + from_thread_s.send(ResultMsg::Error { msg }).unwrap(); + error!("background_thread: error: {}; re-connecting", e); + continue 'outer; + } + } + } + Ok(ControlMsg::StopThread) => { + debug!("background_thread: got a StopThread message"); + break 'outer; + } + Err(RecvTimeoutError::Disconnected) => { + debug!("background_thread: control channel has disconnected"); + break 'outer; + } + Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do + } + } + } + debug!("background_thread: is done"); + Ok(()) +} + +pub struct ReceiverError { + pub msg: String, +} + +impl Display for ReceiverError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let msg = &self.msg; + write!(f, "{msg}") + } +} + +/// Start a background thread that receives data from the socket and +/// puts it into shared memory. +pub struct ServalReceiver { + bg_thread: Option>, + to_thread: Sender, + from_thread: Receiver, + pub status: ReceiverStatus, + pub shm_handle: SHMHandle, +} + +impl ServalReceiver { + pub fn new( + data_uri: &str, + api_uri: &str, + frame_stack_size: usize, + shm: SharedSlabAllocator, + ) -> Self { + let (to_thread_s, to_thread_r) = unbounded(); + let (from_thread_s, from_thread_r) = unbounded(); + + let builder = std::thread::Builder::new(); + let data_uri = data_uri.to_string(); + let api_uri = api_uri.to_string(); + + let shm_handle = shm.get_handle(); + + ServalReceiver { + bg_thread: Some( + builder + .name("bg_thread".to_string()) + .spawn(move || { + background_thread_wrap( + &to_thread_r, + &from_thread_s, + &data_uri, + &api_uri, + frame_stack_size, + shm, + ) + }) + .expect("failed to start background thread"), + ), + from_thread: from_thread_r, + to_thread: to_thread_s, + status: ReceiverStatus::Idle, + shm_handle, + } + } + + fn adjust_status(&mut self, msg: &ResultMsg) { + match msg { + ResultMsg::AcquisitionStart { .. } => { + self.status = ReceiverStatus::Running; + } + ResultMsg::End { .. } => { + self.status = ReceiverStatus::Idle; + } + _ => {} + } + } + + pub fn recv(&mut self) -> ResultMsg { + let result_msg = self + .from_thread + .recv() + .expect("background thread should be running"); + self.adjust_status(&result_msg); + result_msg + } + + pub fn next_timeout(&mut self, timeout: Duration) -> Option { + let result_msg = self.from_thread.recv_timeout(timeout); + + match result_msg { + Ok(result) => { + self.adjust_status(&result); + Some(result) + } + Err(e) => match e { + RecvTimeoutError::Disconnected => { + panic!("background thread should be running") + } + RecvTimeoutError::Timeout => None, + }, + } + } + + pub fn start_passive(&mut self) -> Result<(), ReceiverError> { + if self.status == ReceiverStatus::Closed { + return Err(ReceiverError { + msg: "receiver is closed".to_string(), + }); + } + self.to_thread + .send(ControlMsg::StartAcquisitionPassive) + .expect("background thread should be running"); + self.status = ReceiverStatus::Running; + Ok(()) + } + + pub fn close(&mut self) { + if self.to_thread.send(ControlMsg::StopThread).is_err() { + warn!("could not stop background thread, probably already dead"); + } + if let Some(join_handle) = self.bg_thread.take() { + join_handle + .join() + .expect("could not join background thread!"); + } else { + warn!("did not have a bg thread join handle, cannot join!"); + } + self.status = ReceiverStatus::Closed; + } +} diff --git a/libertem_asi_mpx3/src/stats.rs b/libertem_asi_mpx3/src/stats.rs new file mode 100644 index 00000000..04b97b39 --- /dev/null +++ b/libertem_asi_mpx3/src/stats.rs @@ -0,0 +1,31 @@ +use stats::GetStats; + +use crate::frame_stack::FrameStackHandle; + +impl GetStats for FrameStackHandle { + fn payload_size(&self) -> usize { + self.payload_size() + } + + fn slot_size(&self) -> usize { + self.slot_size() + } + + fn max_frame_size(&self, old_max: usize) -> usize { + self.get_meta() + .iter() + .max_by_key(|fm| fm.data_length_bytes) + .map_or(old_max, |fm| fm.data_length_bytes) + } + + fn min_frame_size(&self, old_min: usize) -> usize { + self.get_meta() + .iter() + .min_by_key(|fm| fm.data_length_bytes) + .map_or(old_min, |fm| fm.data_length_bytes) + } + + fn num_frames(&self) -> usize { + self.len() + } +} diff --git a/libertem_asi_tpx3/examples/live_server.py b/libertem_asi_tpx3/examples/live_server.py index 0894ef56..97878571 100644 --- a/libertem_asi_tpx3/examples/live_server.py +++ b/libertem_asi_tpx3/examples/live_server.py @@ -17,6 +17,8 @@ from libertem.udf.sum import SumUDF from libertem.udf.sumsigudf import SumSigUDF from libertem.udf.masks import ApplyMasksUDF +from libertem.udf.com import COMParams +from libertem_icom.udf.icom import ICOMUDF from libertem.executor.pipelined import PipelinedExecutor from libertem_live.api import LiveContext from libertem_live.udf.monitor import ( @@ -126,9 +128,12 @@ def _ring(): radius_inner=ri) mask_udf = SingleMaskUDF(mask_factories=[_ring]) + params = COMParams(cy=cy, cx=cx, r=ro, ri=ri) + icom_udf = ICOMUDF(params) return OrderedDict({ # "brightfield": SumSigUDF(), "annular": mask_udf, + "icom": icom_udf, # "sum": SumUDF(), # "monitor": SignalMonitorUDF(), "monitor_partition": PartitionMonitorUDF(), @@ -186,6 +191,9 @@ async def make_deltas(self, partial_results: UDFResults, previous_results: typin udf_name = udf_names[idx] for channel_name in partial_results.buffers[idx].keys(): data = partial_results.buffers[idx][channel_name].data + # FIXME implement n-dimnsional result buffers + if len(data.shape) != 2: + continue if previous_results is None: data_previous = np.zeros_like(data) else: @@ -335,7 +343,7 @@ async def acquisition_loop(self): print(f"acquisition done with id={acq_id}; took {t1-t0:.3f}s") async def serve(self): - async with websockets.serve(self, "localhost", 8444): + async with websockets.serve(self, "0.0.0.0", 8444): try: await self.acquisition_loop() finally: diff --git a/serval-client/Cargo.toml b/serval-client/Cargo.toml new file mode 100644 index 00000000..813c9c02 --- /dev/null +++ b/serval-client/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "serval-client" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +reqwest = { version = "0.11.17", features = ["json", "blocking"] } +serde = { version = "1.0.163", features = ["derive"] } +serde_json = "1.0.96" +url = "2.3.1" diff --git a/serval-client/src/lib.rs b/serval-client/src/lib.rs new file mode 100644 index 00000000..006b764c --- /dev/null +++ b/serval-client/src/lib.rs @@ -0,0 +1,310 @@ +use std::fmt::Display; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use url::Url; + +#[derive(Debug)] +pub enum ServalError { + RequestFailed { msg: String }, + SerializationError { msg: String }, + URLError { msg: String }, +} + +impl From for ServalError { + fn from(value: url::ParseError) -> Self { + Self::URLError { + msg: value.to_string(), + } + } +} + +impl From for ServalError { + fn from(value: serde_json::Error) -> Self { + Self::SerializationError { + msg: value.to_string(), + } + } +} + +impl From for ServalError { + fn from(value: reqwest::Error) -> Self { + Self::RequestFailed { + msg: value.to_string(), + } + } +} + +impl Display for ServalError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ServalError::RequestFailed { msg } => write!(f, "request failed: {msg}"), + ServalError::SerializationError { msg } => write!(f, "serialization error: {msg}"), + ServalError::URLError { msg } => write!(f, "URL error: {msg}"), + } + } +} + +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +pub enum TriggerMode { + /// Start: Positive Edge External Trigger Input, Stop: Negative Edge + #[serde(rename = "PEXSTART_NEXSTOP")] + PexStartNexStop, + + /// Start: Negative Edge External Trigger Input, Stop: Positive Edge + #[serde(rename = "NEXSTART_PEXSTOP")] + NexStartPexStop, + + /// Start: Positive Edge External Trigger Input, Stop: HW timer + #[serde(rename = "PEXSTART_TIMERSTOP")] + PexStartTimerStop, + + /// Start: Negative Edge External Trigger Input, Stop: HW timer + #[serde(rename = "NEXSTART_TIMERSTOP")] + NexStartTimerStop, + + #[serde(rename = "AUTOTRIGSTART_TIMERSTOP")] + AutoTriggerStartTimerStop, + + #[serde(rename = "CONTINUOUS")] + Continuous, + + #[serde(rename = "SOFTWARESTART_TIMERSTOP")] + SoftwareStartTimerStop, + + #[serde(rename = "SOFTWARESTART_SOFTWARESTOP")] + SoftwareStartSoftwareStop, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorConfig { + #[serde(rename = "BiasVoltage")] + pub bias_voltage: u64, + + #[serde(rename = "BiasEnabled")] + pub bias_enabled: bool, + + #[serde(rename = "nTriggers")] + pub n_triggers: u64, + + /// Exposure time in seconds + #[serde(rename = "ExposureTime")] + pub exposure_time: f32, + + /// Trigger period in seconds + #[serde(rename = "TriggerPeriod")] + pub trigger_period: f32, + + #[serde(rename = "TriggerMode")] + pub trigger_mode: TriggerMode, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorBoard { + #[serde(rename = "ChipboardId")] + pub chipboard_id: String, + + #[serde(rename = "IpAddress")] + pub ip_address: String, + + #[serde(rename = "FirmwareVersion")] + pub firmare_version: String, + + #[serde(rename = "Chips")] + pub chips: Vec, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorChip { + #[serde(rename = "Index")] + pub index: usize, + + #[serde(rename = "Id")] + pub id: u64, + + #[serde(rename = "Name")] + pub name: String, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorInfo { + #[serde(rename = "IfaceName")] + pub iface_name: String, + + #[serde(rename = "SW_version")] + pub sw_version: String, + + #[serde(rename = "FW_version")] + pub fw_version: String, + + #[serde(rename = "PixCount")] + pub pix_count: u64, + + #[serde(rename = "RowLen")] + pub row_len: u16, + + #[serde(rename = "NumberOfChips")] + pub number_of_chips: u16, + + #[serde(rename = "NumberOfRows")] + pub number_of_rows: u16, + + #[serde(rename = "MpxType")] + pub mpx_type: u64, + + #[serde(rename = "Boards")] + pub boards: Vec, + + #[serde(rename = "SuppAcqModes")] + pub supp_acq_modes: u64, + + #[serde(rename = "ClockReadout")] + pub clock_readout: f32, + + #[serde(rename = "MaxPulseCount")] + pub max_pulse_count: u64, + + #[serde(rename = "MaxPulseHeight")] + pub max_pulse_height: f32, + + #[serde(rename = "MaxPulsePeriod")] + pub max_pulse_period: f32, + + #[serde(rename = "TimerMaxVal")] + pub timer_max_val: f32, + + #[serde(rename = "TimerMinVal")] + pub timer_min_val: f32, + + #[serde(rename = "TimerStep")] + pub timer_step: f32, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub enum DetectorOrientation { + #[serde(rename = "UP")] + Up, + + #[serde(rename = "RIGHT")] + Right, + + #[serde(rename = "DOWN")] + Down, + + #[serde(rename = "LEFT")] + Left, + + #[serde(rename = "UP_MIRRORED")] + UpMirrored, + + #[serde(rename = "RIGHT_MIRRORED")] + RightMirrored, + + #[serde(rename = "DOWN_MIRRORED")] + DownMirrored, + + #[serde(rename = "LEFT_MIRRORED")] + LeftMirrored, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub enum ChipOrientation { + LtRBtT, + RtLBtT, + LtRTtB, + RtLTtB, + BtTLtR, + TtBLtR, + BtTRtL, + TtBRtL, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorLayout { + #[serde(rename = "Orientation")] + orientation: DetectorOrientation, + + #[serde(rename = "Original")] + original: DetectorLayoutInner, + + #[serde(rename = "Rotated")] + rotated: DetectorLayoutInner, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorLayoutChip { + #[serde(rename = "Chip")] + chip: u16, + + #[serde(rename = "X")] + x: u16, + + #[serde(rename = "Y")] + y: u16, + + #[serde(rename = "Orientation")] + orientation: ChipOrientation, +} + +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub struct DetectorLayoutInner { + #[serde(rename = "Width")] + width: u16, + + #[serde(rename = "Height")] + height: u16, + + #[serde(rename = "Chips")] + chips: Vec, +} + +pub struct ServalClient { + base_url: Url, +} + +impl ServalClient { + pub fn new(base_url: &str) -> Self { + ServalClient { + base_url: Url::parse(base_url).unwrap(), + } + } + + fn get_request(&self, path: &str) -> Result + where + T: DeserializeOwned, + { + let url = self.base_url.join(path)?; + let resp = reqwest::blocking::get(url)?; + let resp_text = resp.text()?; + let config: T = serde_json::from_str(&resp_text)?; + Ok(config) + } + + pub fn get_detector_config(&self) -> Result { + self.get_request("/detector/config/") + } + + pub fn get_detector_info(&self) -> Result { + self.get_request("/detector/info/") + } + + pub fn get_detector_layout(&self) -> Result { + self.get_request("/detector/layout/") + } + + pub fn start_measurement(&self) { + todo!(); + } +} + +#[cfg(test)] +mod test { + use crate::ServalClient; + + #[test] + fn test_stuff() { + let client = ServalClient::new("http://localhost:8080"); + + println!("{:?}", client.get_detector_config().unwrap()); + panic!("at the disco?"); + } +}