From 180f7d0685f5375027d41a22d8122a5a26482001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20R=C3=B3=C5=BCycki?= Date: Thu, 14 Dec 2023 11:19:28 +0100 Subject: [PATCH] light-client: Move backoff logic from animator to light client --- Cargo.lock | 1 + animator/src/lib.rs | 72 ++++------------- light-client/Cargo.toml | 1 + light-client/src/backoff_decorator.rs | 110 ++++++++++++++++++++++++++ light-client/src/http.rs | 6 +- light-client/src/lib.rs | 1 + light-client/src/tcp.rs | 8 +- 7 files changed, 141 insertions(+), 58 deletions(-) create mode 100644 light-client/src/backoff_decorator.rs diff --git a/Cargo.lock b/Cargo.lock index 1d3af1c..a5b06c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4835,6 +4835,7 @@ name = "rustmas-light-client" version = "0.1.0" dependencies = [ "async-trait", + "chrono", "csv", "futures-util", "lightfx 0.1.0", diff --git a/animator/src/lib.rs b/animator/src/lib.rs index 8b1f201..2fc0bac 100644 --- a/animator/src/lib.rs +++ b/animator/src/lib.rs @@ -35,13 +35,6 @@ pub enum ControllerError { AnimationFactoryError(#[from] AnimationFactoryError), } -#[derive(PartialEq)] -enum ConnectionStatus { - Healthy, - IntermittentFailure, - ProlongedFailure, -} - pub struct ControllerState { animation: Option, last_frame: DateTime, @@ -59,7 +52,7 @@ pub struct Controller { } impl Controller { - pub fn new>( + fn new>( points: Vec<(f64, f64, f64)>, plugin_dir: P, client: Box, @@ -113,28 +106,22 @@ impl Controller { client: Box, point_count: usize, ) { - let start_backoff_delay: Duration = Duration::seconds(1); - let max_backoff_delay: Duration = Duration::seconds(8); - - let mut backoff_delay = start_backoff_delay; - let mut status = ConnectionStatus::Healthy; - let now = Utc::now(); - let mut next_check = now; + let mut next_check = Utc::now(); loop { + let now = Utc::now(); tokio::time::sleep( - (next_check - Utc::now()) + (next_check - now) .clamp(Duration::milliseconds(0), Duration::milliseconds(33)) .to_std() .unwrap(), ) .await; - let now = Utc::now(); let (Ok(frame),) = ({ let mut state = state.lock().await; if now < state.next_frame { - next_check = state.next_frame.min(now + backoff_delay); + next_check = state.next_frame.min(now + Duration::seconds(1)); continue; } state.next_frame = if state.fps != 0.0 { @@ -156,37 +143,10 @@ impl Controller { continue; }; - match client.display_frame(&frame).await { - Ok(_) => { - if status != ConnectionStatus::Healthy { - info!("Regained connection to light client"); - } - status = ConnectionStatus::Healthy; - backoff_delay = start_backoff_delay; - } - Err(LightClientError::ConnectionLost) => { - next_check = now + backoff_delay; - backoff_delay = (backoff_delay * 2).min(max_backoff_delay); - if backoff_delay < max_backoff_delay { - status = ConnectionStatus::IntermittentFailure; - warn!( - "Failed to send frame to remote lights, will retry in {:.2} seconds", - backoff_delay.num_milliseconds() as f64 / 1000.0 - ); - } else if status != ConnectionStatus::ProlongedFailure { - status = ConnectionStatus::ProlongedFailure; - warn!( - "Lost connection to lights, will continue retrying every {:.2} seconds", - max_backoff_delay.num_milliseconds() as f64 / 1000.0 - ); - } - } - Err(LightClientError::ProcessExited) => { - warn!("Light client exited, exiting"); - return; - } - _ => (), - }; + if client.display_frame(&frame).await == Err(LightClientError::ProcessExited) { + warn!("Light client exited, exiting"); + return; + } } } @@ -348,17 +308,19 @@ impl ControllerBuilder { pub fn http_lights(mut self, path: &str) -> Result> { info!("Using http light client with endpoint {}", path); - self.client_builder = self - .client_builder - .with(Box::new(client::http::HttpLightClient::new(path))); + self.client_builder = self.client_builder.with(Box::new( + client::http::HttpLightClient::new(path).with_backoff(), + )); Ok(self) } pub fn tcp_lights(mut self, path: &str) -> Result> { info!("Using tcp light client with endpoint {}", path); - self.client_builder = self - .client_builder - .with(Box::new(client::tcp::TcpLightClient::new(path))); + self.client_builder = self.client_builder.with(Box::new( + client::tcp::TcpLightClient::new(path) + .with_backoff() + .with_start_delay(Duration::milliseconds(125)), + )); Ok(self) } diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index 382c089..731f038 100755 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -8,6 +8,7 @@ lightfx = { path = "../lightfx" } async-trait = "0.1.57" csv = "1.1.6" +chrono = "0.4.31" futures-util = "0.3.28" log = "0.4.17" reqwest = "0.11.12" diff --git a/light-client/src/backoff_decorator.rs b/light-client/src/backoff_decorator.rs new file mode 100644 index 0000000..bcb44e3 --- /dev/null +++ b/light-client/src/backoff_decorator.rs @@ -0,0 +1,110 @@ +use std::time::Duration as StdDuration; + +use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; +use log::{info, warn}; +use tokio::sync::Mutex; + +use crate::{LightClient, LightClientError}; + +#[derive(PartialEq)] +enum ConnectionStatus { + Healthy, + IntermittentFailure, + ProlongedFailure, +} + +struct BackoffState { + status: ConnectionStatus, + delay: Duration, + next_check: DateTime, +} + +pub struct BackoffDecorator { + inner: T, + start_delay: Duration, + max_delay: Duration, + timeout: StdDuration, + state: Mutex, +} + +impl BackoffDecorator { + pub fn new(light_client: T) -> BackoffDecorator { + let default_start_delay = Duration::seconds(1); + Self { + inner: light_client, + start_delay: default_start_delay, + max_delay: Duration::seconds(8), + timeout: StdDuration::from_millis(100), + state: Mutex::new(BackoffState { + status: ConnectionStatus::Healthy, + delay: default_start_delay, + next_check: Utc::now(), + }), + } + } + + pub fn with_start_delay(mut self, delay: Duration) -> Self { + self.start_delay = delay; + self + } + + pub fn with_max_delay(mut self, delay: Duration) -> Self { + self.max_delay = delay; + self + } + + pub fn with_timeout(mut self, timeout: StdDuration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl LightClient for BackoffDecorator +where + T: LightClient + Send + Sync, +{ + async fn display_frame(&self, frame: &lightfx::Frame) -> Result<(), LightClientError> { + let mut state = self.state.lock().await; + let now = Utc::now(); + if now < state.next_check && state.status != ConnectionStatus::Healthy { + return Err(LightClientError::ConnectionLost); + } + + match tokio::time::timeout(self.timeout, self.inner.display_frame(frame)).await { + Ok(Ok(_)) => { + if state.status != ConnectionStatus::Healthy { + info!("Regained connection to light client"); + } + state.status = ConnectionStatus::Healthy; + state.delay = self.start_delay; + state.next_check = now; + Ok(()) + } + Ok(Err(LightClientError::ConnectionLost)) | Err(_) => { + state.next_check = now + state.delay; + if state.delay < self.max_delay { + state.status = ConnectionStatus::IntermittentFailure; + warn!( + "Failed to send frame to remote lights, will retry in {:.2} seconds", + state.delay.num_milliseconds() as f64 / 1000.0 + ); + } else if state.status != ConnectionStatus::ProlongedFailure { + state.status = ConnectionStatus::ProlongedFailure; + warn!( + "Lost connection to lights, will continue retrying every {:.2} seconds", + self.max_delay.num_milliseconds() as f64 / 1000.0 + ); + } + state.delay = (state.delay * 2).min(self.max_delay); + Err(LightClientError::ConnectionLost) + } + Ok(Err(LightClientError::ProcessExited)) => { + warn!("Light client exited, exiting"); + Err(LightClientError::ProcessExited) + } + _ => Err(LightClientError::Unlikely), + } + } +} diff --git a/light-client/src/http.rs b/light-client/src/http.rs index 08e8649..632f183 100644 --- a/light-client/src/http.rs +++ b/light-client/src/http.rs @@ -1,4 +1,4 @@ -use crate::{LightClient, LightClientError}; +use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError}; use async_trait::async_trait; use lightfx::Frame; use log::debug; @@ -23,6 +23,10 @@ impl HttpLightClient { .unwrap(), } } + + pub fn with_backoff(self) -> BackoffDecorator { + BackoffDecorator::new(self) + } } #[async_trait] diff --git a/light-client/src/lib.rs b/light-client/src/lib.rs index 37910a3..151c565 100755 --- a/light-client/src/lib.rs +++ b/light-client/src/lib.rs @@ -1,3 +1,4 @@ +pub mod backoff_decorator; pub mod combined; pub mod feedback; pub mod http; diff --git a/light-client/src/tcp.rs b/light-client/src/tcp.rs index b09b0cc..7d0c9cf 100644 --- a/light-client/src/tcp.rs +++ b/light-client/src/tcp.rs @@ -1,4 +1,4 @@ -use crate::{LightClient, LightClientError}; +use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError}; use async_trait::async_trait; use lightfx::Frame; use log::{debug, error, info}; @@ -20,8 +20,12 @@ impl TcpLightClient { } } + pub fn with_backoff(self) -> BackoffDecorator { + BackoffDecorator::new(self) + } + async fn connect(&self) -> Result<(), Box> { - info!("Connecting to remote lights via TCP"); + debug!("Connecting to remote lights via TCP"); let mut stream = self.stream.lock().await; let s = TcpStream::connect(&self.url).await?; s.set_nodelay(true)?;