Skip to content

Commit

Permalink
Misc updates
Browse files Browse the repository at this point in the history
- microsecond timestamps for logging
- expose more detector info/config/layout to Python
- expose ServalClient to Python
- add first frame meta to AcquisitionStart message
- `wait_for_arm` returns `PendingAcquisition` with detector config and
  first frame meta info
- `serval-client`:
    - add more structs for detector info/config/layout
    - error handling: don't panic
  • Loading branch information
sk1p committed May 17, 2023
1 parent b47f5db commit abf4e5c
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 26 deletions.
7 changes: 5 additions & 2 deletions libertem_asi_mpx3/examples/mpx3_example_03_acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def init_acquisition(serverurl, detector_config):

# 0.1 second = 10fps
# 0.0005 second = 2000fps
dwell_time = 0.0001
# dwell_time = 0.0005
# dwell_time = 0.0001
dwell_time = 0.0005

# Sets the trigger period (time between triggers) in seconds.
detector_config["TriggerPeriod"] = dwell_time
Expand Down Expand Up @@ -193,6 +193,9 @@ def acquisition_test(serverurl):
data = response.text
print('Selected destination : ' + data)

print(f"Layout: {json.dumps(get_request(url=serverurl + '/detector/layout').json(), indent=2)}")
print(f"Info: {json.dumps(get_request(url=serverurl + '/detector/info').json(), indent=2)}")

# Running acquisition process
acquisition_test(serverurl)

Expand Down
5 changes: 4 additions & 1 deletion libertem_asi_mpx3/examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

conn.start_passive()

cam_client = None

try:
while True:
config = None
Expand Down Expand Up @@ -54,4 +56,5 @@
tq.close()
finally:
conn.close() # clean up background thread etc.
cam_client.close()
if cam_client is not None:
cam_client.close()
12 changes: 11 additions & 1 deletion libertem_asi_mpx3/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pyo3::pyclass;
use pyo3::{pyclass, pymethods};
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
Expand All @@ -25,6 +25,16 @@ impl DType {
}
}

#[pymethods]
impl DType {
fn as_string(&self) -> String {
match self {
Self::U8 => "uint8".to_string(),
Self::U16 => "uint16".to_string(),
}
}
}

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
pub struct FrameMeta {
pub sequence: u64,
Expand Down
115 changes: 107 additions & 8 deletions libertem_asi_mpx3/src/main_py.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use crate::{
cam_client::CamClient,
common::DType,
common::{DType, FrameMeta},
exceptions::{ConnectionError, TimeoutError},
frame_stack::FrameStackHandle,
receiver::{ReceiverStatus, ResultMsg, ServalReceiver},
Expand All @@ -20,7 +20,7 @@ use pyo3::{
exceptions::{self, PyRuntimeError},
prelude::*,
};
use serval_client::DetectorConfig;
use serval_client::{DetectorConfig, DetectorInfo, DetectorLayout, ServalClient};
use stats::Stats;

#[pymodule]
Expand All @@ -33,6 +33,8 @@ fn libertem_asi_mpx3(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<ServalConnection>()?;
m.add_class::<DType>()?;
m.add_class::<PyDetectorConfig>()?;
m.add_class::<PyDetectorInfo>()?;
m.add_class::<PyServalClient>()?;
m.add_class::<CamClient>()?;
m.add("TimeoutError", py.get_type::<TimeoutError>())?;

Expand All @@ -41,7 +43,9 @@ fn libertem_asi_mpx3(py: Python, m: &PyModule) -> PyResult<()> {
let env = env_logger::Env::default()
.filter_or("LIBERTEM_ASI_LOG_LEVEL", "error")
.write_style_or("LIBERTEM_ASI_LOG_STYLE", "always");
env_logger::init_from_env(env);
env_logger::Builder::from_env(env)
.format_timestamp_micros()
.init();

Ok(())
}
Expand Down Expand Up @@ -69,6 +73,71 @@ impl PyDetectorConfig {
}
}

#[derive(Debug)]
#[pyclass(name = "DetectorInfo")]
struct PyDetectorInfo {
info: DetectorInfo,
}

#[pymethods]
impl PyDetectorInfo {
fn __repr__(&self) -> String {
format!("{:?}", self)
}

fn get_pix_count(&self) -> u64 {
self.info.pix_count
}
}

#[derive(Debug)]
#[pyclass(name = "DetectorLayout")]
struct PyDetectorLayout {
info: DetectorLayout,
}

#[pymethods]
impl PyDetectorLayout {
fn __repr__(&self) -> String {
format!("{:?}", self)
}
}

#[pyclass(name = "ServalAPIClient")]
struct PyServalClient {
client: ServalClient,
base_url: String,
}

#[pymethods]
impl PyServalClient {
#[new]
fn new(base_url: &str) -> Self {
Self {
client: ServalClient::new(base_url),
base_url: base_url.to_string(),
}
}

fn __repr__(&self) -> String {
format!("<ServalClient base_url={}>", self.base_url)
}

fn get_detector_config(&self) -> PyResult<PyDetectorConfig> {
self.client
.get_detector_config()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
.map(|value| PyDetectorConfig { config: value })
}

fn get_detector_info(&self) -> PyResult<PyDetectorInfo> {
self.client
.get_detector_info()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
.map(|value| PyDetectorInfo { info: value })
}
}

struct FrameChunkedIterator<'a, 'b, 'c, 'd> {
receiver: &'a mut ServalReceiver,
shm: &'b mut SharedSlabAllocator,
Expand Down Expand Up @@ -139,9 +208,12 @@ impl<'a, 'b, 'c, 'd> FrameChunkedIterator<'a, 'b, 'c, 'd> {
continue;
}
Some(ResultMsg::ParseError { msg }) => {
todo!();
return Err(exceptions::PyRuntimeError::new_err(msg))
}
Some(ResultMsg::AcquisitionStart { detector_config }) => {
Some(ResultMsg::AcquisitionStart {
detector_config: _,
first_frame_meta: _,
}) => {
// FIXME: in case of "passive" mode, we should actually not hit this,
// as the "outer" structure (`ServalConnection`) handles it?
continue;
Expand Down Expand Up @@ -196,6 +268,29 @@ impl ServalConnection {
}
}

#[pyclass]
struct PendingAcquisition {
config: DetectorConfig,
first_frame_meta: FrameMeta,
}

#[pymethods]
impl PendingAcquisition {
fn get_detector_config(&self) -> PyDetectorConfig {
PyDetectorConfig {
config: self.config.clone(),
}
}

fn get_frame_width(&self) -> u16 {
self.first_frame_meta.width
}

fn get_frame_height(&self) -> u16 {
self.first_frame_meta.height
}
}

#[pymethods]
impl ServalConnection {
#[new]
Expand Down Expand Up @@ -238,7 +333,7 @@ impl ServalConnection {
/// Wait until the detector is armed, or until the timeout expires (in seconds)
/// Returns `None` in case of timeout, the detector config otherwise.
/// This method drops the GIL to allow concurrent Python threads.
fn wait_for_arm(&mut self, timeout: f32, py: Python) -> PyResult<Option<PyDetectorConfig>> {
fn wait_for_arm(&mut self, timeout: f32, py: Python) -> PyResult<Option<PendingAcquisition>> {
let timeout = Duration::from_secs_f32(timeout);
let deadline = Instant::now() + timeout;
let step = Duration::from_millis(100);
Expand All @@ -253,9 +348,13 @@ impl ServalConnection {
});

match res {
Some(ResultMsg::AcquisitionStart { detector_config }) => {
return Ok(Some(PyDetectorConfig {
Some(ResultMsg::AcquisitionStart {
detector_config,
first_frame_meta,
}) => {
return Ok(Some(PendingAcquisition {
config: detector_config,
first_frame_meta,
}))
}
msg @ Some(ResultMsg::End { .. }) | msg @ Some(ResultMsg::FrameStack { .. }) => {
Expand Down
22 changes: 19 additions & 3 deletions libertem_asi_mpx3/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError};
use ipc_test::{SHMHandle, SharedSlabAllocator};
use log::{debug, error, info, trace, warn};
use serval_client::{DetectorConfig, ServalClient};
use serval_client::{DetectorConfig, ServalClient, ServalError};

use crate::{
common::{DType, FrameMeta},
Expand All @@ -31,6 +31,7 @@ pub enum ResultMsg {

AcquisitionStart {
detector_config: DetectorConfig,
first_frame_meta: FrameMeta,
},

/// A stack of frames, part of an acquisition
Expand Down Expand Up @@ -282,6 +283,7 @@ enum AcquisitionError {
ConfigurationError { msg: String },
ParseError { msg: String },
ConnectionError { msg: String },
APIError { msg: String },
}

impl Display for AcquisitionError {
Expand All @@ -308,6 +310,9 @@ impl Display for AcquisitionError {
AcquisitionError::ConnectionError { msg } => {
write!(f, "connection error: {msg}")
}
AcquisitionError::APIError { msg } => {
write!(f, "serval HTTP API error: {msg}")
}
}
}
}
Expand All @@ -326,6 +331,14 @@ impl<T> From<SendError<T>> for AcquisitionError {
}
}

impl From<ServalError> for AcquisitionError {
fn from(value: ServalError) -> Self {
Self::APIError {
msg: value.to_string(),
}
}
}

/// With a running acquisition, check for control messages;
/// especially convert `ControlMsg::StopThread` to `AcquisitionError::Cancelled`.
fn check_for_control(control_channel: &Receiver<ControlMsg>) -> Result<(), AcquisitionError> {
Expand Down Expand Up @@ -374,7 +387,7 @@ fn passive_acquisition(
};

// block until we get the first frame:
let _first_frame = match peek_header(&mut stream) {
let first_frame_meta = match peek_header(&mut stream) {
Ok(m) => m,
Err(AcquisitionError::ConnectionError { msg }) => {
warn!("connection error while peeking first frame: {msg}; reconnecting");
Expand All @@ -385,12 +398,13 @@ fn passive_acquisition(

// then, we should be able to reliably get the detector config
// (we assume once data arrives, the config is immutable)
let detector_config = client.get_detector_config();
let detector_config = client.get_detector_config()?;

acquisition(
control_channel,
from_thread_s,
&detector_config,
&first_frame_meta,
&mut stream,
frame_stack_size,
shm,
Expand All @@ -408,6 +422,7 @@ fn acquisition(
to_thread_r: &Receiver<ControlMsg>,
from_thread_s: &Sender<ResultMsg>,
detector_config: &DetectorConfig,
first_frame_meta: &FrameMeta,
stream: &mut TcpStream,
frame_stack_size: usize,
shm: &mut SharedSlabAllocator,
Expand All @@ -417,6 +432,7 @@ fn acquisition(

from_thread_s.send(ResultMsg::AcquisitionStart {
detector_config: detector_config.clone(),
first_frame_meta: first_frame_meta.clone(),
})?;

debug!("acquisition starting");
Expand Down
Loading

0 comments on commit abf4e5c

Please sign in to comment.