diff --git a/drmemd/Cargo.toml b/drmemd/Cargo.toml index 43960ec..819e284 100644 --- a/drmemd/Cargo.toml +++ b/drmemd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "drmemd" -version = "0.4.0" +version = "0.4.1" authors = ["Rich Neswold "] edition = "2021" description = "Main process of the DrMem control system" diff --git a/drmemd/src/logic/compile.rs b/drmemd/src/logic/compile.rs index 034e40e..c612544 100644 --- a/drmemd/src/logic/compile.rs +++ b/drmemd/src/logic/compile.rs @@ -2934,4 +2934,40 @@ mod tests { ); } } + + #[test] + fn test_solar_usage() { + const DATA: &[(&str, bool)] = &[ + // Make sure literals, variables, and time values don't + // return a field. + ("{a}", false), + ("1", false), + ("1.0", false), + ("true", false), + ("#green", false), + ("\"test\"", false), + ("{utc:second}", false), + // Make sure the solar values return true. + ("{solar:alt}", true), + ("{solar:dec}", true), + ("{solar:ra}", true), + ("{solar:az}", true), + // Now test more complicated expressions to make sure each + // subtree is correctly compared. + ("not (2 > 3)", false), + ("2 + 2", false), + ("{solar:alt} + 2", true), + ("2 + {solar:az}", true), + ("{solar:dec} + {solar:az}", true), + ]; + + for (expr, result) in DATA { + assert_eq!( + &to_expr(expr).uses_solar(), + result, + "error using {}", + expr + ); + } + } } diff --git a/drmemd/src/logic/mod.rs b/drmemd/src/logic/mod.rs index d3b835c..19a852f 100644 --- a/drmemd/src/logic/mod.rs +++ b/drmemd/src/logic/mod.rs @@ -1,5 +1,5 @@ use drmem_api::{client, device, driver, Result}; -use futures::future::join_all; +use futures::future::{join_all, pending}; use std::collections::HashMap; use std::convert::Infallible; use std::sync::Arc; @@ -395,7 +395,7 @@ impl Node { let wait_for_time = async { match self.time_ch.as_mut() { - None => None, + None => pending().await, Some(s) => s.next().await, } }; @@ -407,23 +407,32 @@ impl Node { let wait_for_solar = async { match self.solar_ch.as_mut() { - None => None, - Some(ch) => ch.recv().await.ok(), + None => pending().await, + Some(ch) => ch.recv().await, } }; #[rustfmt::skip] tokio::select! { - // Wait for the next reading to arrive. All the - // incoming streams have been combined into one and - // the returned value is a pair consisting of an index - // and the actual reading. + biased; - Some((idx, reading)) = self.in_stream.next() => { - // Save the reading in our array for future - // recalculations. + // If we need the solar channel, wait for the next + // update. - self.inputs[idx] = Some(reading.value); + v = wait_for_solar => { + match v { + Ok(v) => solar = Some(v), + Err(broadcast::error::RecvError::Lagged(_)) => { + warn!("not handling solar info fast enough"); + continue + } + Err(broadcast::error::RecvError::Closed) => { + error!("solar info channel is closed"); + return Err(drmem_api::Error::OperationError( + "solar channel closed".into() + )); + } + } } // If we need the time channel, wait for the next @@ -433,11 +442,16 @@ impl Node { time = v; } - // If we need the solar channel, wait for the next - // update. + // Wait for the next reading to arrive. All the + // incoming streams have been combined into one and + // the returned value is a pair consisting of an index + // and the actual reading. + + Some((idx, reading)) = self.in_stream.next() => { + // Save the reading in our array for future + // recalculations. - Some(v) = wait_for_solar => { - solar = Some(v); + self.inputs[idx] = Some(reading.value); } } @@ -557,16 +571,16 @@ mod test { // Create the common channels used by DrMem. let (tx_req, mut c_recv) = mpsc::channel(100); - let (tx_tod, rx_tod) = broadcast::channel(100); - let (tx_solar, rx_solar) = broadcast::channel(100); + let (tx_tod, _) = broadcast::channel(100); + let (tx_solar, _) = broadcast::channel(100); // Start the logic block with the proper communciation // channels and configuration. let node = Node::start( client::RequestChan::new(tx_req), - rx_tod, - rx_solar, + tx_tod.subscribe(), + tx_solar.subscribe(), cfg, ); @@ -954,6 +968,62 @@ mod test { assert_eq!(emu.await.unwrap(), Ok(true)); } + // Test a basic logic block in which forwards a solar parameter to + // a memory device. + + #[tokio::test] + async fn test_basic_solar_node() { + const OUT1: &str = "device:out1"; + const OUT2: &str = "device:out2"; + let cfg = build_config( + &[], + &[("alt", OUT1), ("dec", OUT2)], + &[], + &["{solar:alt} -> {alt}", "{solar:dec} -> {dec}"], + ); + let (tx_out1, mut rx_out1) = mpsc::channel(100); + let (tx_out2, mut rx_out2) = mpsc::channel(100); + + let (_, tx_solar, emu, tx_stop) = Emulator::start( + vec![], + vec![(OUT1.into(), tx_out1), (OUT2.into(), tx_out2)], + cfg, + ) + .await + .unwrap(); + + // Send a value and see if it was forwarded. + + assert!(tx_solar + .send(Arc::new(solar::SolarInfo { + elevation: 1.0, + azimuth: 2.0, + right_ascension: 3.0, + declination: 4.0 + })) + .is_ok()); + + { + let (value, rpy) = rx_out1.recv().await.unwrap(); + let _ = rpy.send(Ok(value.clone())); + + assert_eq!(value, device::Value::Flt(1.0)); + } + + { + let (value, rpy) = rx_out2.recv().await.unwrap(); + let _ = rpy.send(Ok(value.clone())); + + assert_eq!(value, device::Value::Flt(4.0)); + } + + // Stop the emulator and see that its return status is good. + + let _ = tx_stop.send(()); + + assert_eq!(emu.await.unwrap(), Ok(true)); + } + // Test a logic block with two outputs. Make sure they are sent // "in parallel". diff --git a/drmemd/src/logic/solar.rs b/drmemd/src/logic/solar.rs index 1d2dbfe..b43d0ad 100644 --- a/drmemd/src/logic/solar.rs +++ b/drmemd/src/logic/solar.rs @@ -135,7 +135,7 @@ pub fn create_task( lat: f64, long: f64, ) -> (broadcast::Sender, broadcast::Receiver) { - let (tx, rx) = broadcast::channel(1); + let (tx, rx) = broadcast::channel(10); let tx_copy = tx.clone(); tokio::spawn( diff --git a/drmemd/src/main.rs b/drmemd/src/main.rs index f076c1f..108e396 100644 --- a/drmemd/src/main.rs +++ b/drmemd/src/main.rs @@ -156,37 +156,34 @@ async fn run() -> Result<()> { } } - // Start the time-of-day task. This needs to be done *before* - // any logic blocks are started because logic blocks *may* - // have an expression that uses the time-of-day. + // Create a nested scope so that the tod and solar handles are + // freed up. - let (tx_tod, rx_tod) = logic::tod::create_task(); - - // Start the solar task. This, too, needs to be done before - // any logic blocks are started. + { + // Start the time-of-day task. This needs to be done + // *before* any logic blocks are started because logic + // blocks *may* have an expression that uses the + // time-of-day. - let (tx_solar, rx_solar) = - logic::solar::create_task(cfg.latitude, cfg.longitude); + let (tx_tod, _) = logic::tod::create_task(); - // Iterate through the [[logic]] sections of the config. + // Start the solar task. This, too, needs to be done + // before any logic blocks are started. - for logic in cfg.logic { - tasks.push(wrap_task(logic::Node::start( - tx_clnt_req.clone(), - tx_tod.subscribe(), - tx_solar.subscribe(), - logic, - ))); - } + let (tx_solar, _) = + logic::solar::create_task(cfg.latitude, cfg.longitude); - // Now that we've given all the logic blocks receive handles - // for the time-of-day and solar tasks, we can free up our - // copy. If we freed up our copy *before* creating new - // subscriptions, the tod or solar task may have briefly seen - // no clients and would exit. + // Iterate through the [[logic]] sections of the config. - std::mem::drop(rx_tod); - std::mem::drop(rx_solar); + for logic in cfg.logic { + tasks.push(wrap_task(logic::Node::start( + tx_clnt_req.clone(), + tx_tod.subscribe(), + tx_solar.subscribe(), + logic, + ))); + } + } // Now run all the tasks.