Skip to content

Commit

Permalink
light-client: Move backoff logic from animator to light client
Browse files Browse the repository at this point in the history
  • Loading branch information
mrozycki committed Dec 14, 2023
1 parent fa82074 commit a3b8b17
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 17 additions & 55 deletions animator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ pub enum ControllerError {
AnimationFactoryError(#[from] AnimationFactoryError),
}

#[derive(PartialEq)]
enum ConnectionStatus {
Healthy,
IntermittentFailure,
ProlongedFailure,
}

pub struct ControllerState {
animation: Option<AnimationPlugin>,
last_frame: DateTime<Utc>,
Expand All @@ -59,7 +52,7 @@ pub struct Controller {
}

impl Controller {
pub fn new<P: AsRef<Path>>(
fn new<P: AsRef<Path>>(
points: Vec<(f64, f64, f64)>,
plugin_dir: P,
client: Box<dyn rustmas_light_client::LightClient + Sync + Send>,
Expand Down Expand Up @@ -113,28 +106,22 @@ impl Controller {
client: Box<dyn rustmas_light_client::LightClient + Sync + Send>,
point_count: usize,
) {
let start_backoff_delay: Duration = Duration::seconds(1);
let max_backoff_delay: Duration = Duration::seconds(8);

let mut backoff_delay = start_backoff_delay;
let mut status = ConnectionStatus::Healthy;
let now = Utc::now();
let mut next_check = now;
let mut next_check = Utc::now();

loop {
let now = Utc::now();
tokio::time::sleep(
(next_check - Utc::now())
(next_check - now)
.clamp(Duration::milliseconds(0), Duration::milliseconds(33))
.to_std()
.unwrap(),
)
.await;

let now = Utc::now();
let (Ok(frame),) = ({
let mut state = state.lock().await;
if now < state.next_frame {
next_check = state.next_frame.min(now + backoff_delay);
next_check = state.next_frame.min(now + Duration::seconds(1));
continue;
}
state.next_frame = if state.fps != 0.0 {
Expand All @@ -156,37 +143,10 @@ impl Controller {
continue;
};

match client.display_frame(&frame).await {
Ok(_) => {
if status != ConnectionStatus::Healthy {
info!("Regained connection to light client");
}
status = ConnectionStatus::Healthy;
backoff_delay = start_backoff_delay;
}
Err(LightClientError::ConnectionLost) => {
next_check = now + backoff_delay;
backoff_delay = (backoff_delay * 2).min(max_backoff_delay);
if backoff_delay < max_backoff_delay {
status = ConnectionStatus::IntermittentFailure;
warn!(
"Failed to send frame to remote lights, will retry in {:.2} seconds",
backoff_delay.num_milliseconds() as f64 / 1000.0
);
} else if status != ConnectionStatus::ProlongedFailure {
status = ConnectionStatus::ProlongedFailure;
warn!(
"Lost connection to lights, will continue retrying every {:.2} seconds",
max_backoff_delay.num_milliseconds() as f64 / 1000.0
);
}
}
Err(LightClientError::ProcessExited) => {
warn!("Light client exited, exiting");
return;
}
_ => (),
};
if client.display_frame(&frame).await == Err(LightClientError::ProcessExited) {
warn!("Light client exited, exiting");
return;
}
}
}

Expand Down Expand Up @@ -348,17 +308,19 @@ impl ControllerBuilder {

pub fn http_lights(mut self, path: &str) -> Result<Self, Box<dyn Error>> {
info!("Using http light client with endpoint {}", path);
self.client_builder = self
.client_builder
.with(Box::new(client::http::HttpLightClient::new(path)));
self.client_builder = self.client_builder.with(Box::new(
client::http::HttpLightClient::new(path).with_backoff(),
));
Ok(self)
}

pub fn tcp_lights(mut self, path: &str) -> Result<Self, Box<dyn Error>> {
info!("Using tcp light client with endpoint {}", path);
self.client_builder = self
.client_builder
.with(Box::new(client::tcp::TcpLightClient::new(path)));
self.client_builder = self.client_builder.with(Box::new(
client::tcp::TcpLightClient::new(path)
.with_backoff()
.with_start_delay(Duration::milliseconds(125)),
));
Ok(self)
}

Expand Down
1 change: 1 addition & 0 deletions light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lightfx = { path = "../lightfx" }

async-trait = "0.1.57"
csv = "1.1.6"
chrono = "0.4.31"
futures-util = "0.3.28"
log = "0.4.17"
reqwest = "0.11.12"
Expand Down
110 changes: 110 additions & 0 deletions light-client/src/backoff_decorator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::time::Duration as StdDuration;

use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use log::{info, warn};
use tokio::sync::Mutex;

use crate::{LightClient, LightClientError};

#[derive(PartialEq)]
enum ConnectionStatus {
Healthy,
IntermittentFailure,
ProlongedFailure,
}

struct BackoffState {
status: ConnectionStatus,
delay: Duration,
next_check: DateTime<Utc>,
}

pub struct BackoffDecorator<T: LightClient> {
inner: T,
start_delay: Duration,
max_delay: Duration,
timeout: StdDuration,
state: Mutex<BackoffState>,
}

impl<T: LightClient> BackoffDecorator<T> {
pub fn new(light_client: T) -> BackoffDecorator<T> {
let default_start_delay = Duration::seconds(1);
Self {
inner: light_client,
start_delay: default_start_delay,
max_delay: Duration::seconds(8),
timeout: StdDuration::from_millis(100),
state: Mutex::new(BackoffState {
status: ConnectionStatus::Healthy,
delay: default_start_delay,
next_check: Utc::now(),
}),
}
}

pub fn with_start_delay(mut self, delay: Duration) -> Self {
self.start_delay = delay;
self
}

pub fn with_max_delay(mut self, delay: Duration) -> Self {
self.max_delay = delay;
self
}

pub fn with_timeout(mut self, timeout: StdDuration) -> Self {
self.timeout = timeout;
self
}
}

#[async_trait]
impl<T> LightClient for BackoffDecorator<T>
where
T: LightClient + Send + Sync,
{
async fn display_frame(&self, frame: &lightfx::Frame) -> Result<(), LightClientError> {
let mut state = self.state.lock().await;
let now = Utc::now();
if now < state.next_check && state.status != ConnectionStatus::Healthy {
return Err(LightClientError::ConnectionLost);
}

match tokio::time::timeout(self.timeout, self.inner.display_frame(frame)).await {
Ok(Ok(_)) => {
if state.status != ConnectionStatus::Healthy {
info!("Regained connection to light client");
}
state.status = ConnectionStatus::Healthy;
state.delay = self.start_delay;
state.next_check = now;
Ok(())
}
Ok(Err(LightClientError::ConnectionLost)) | Err(_) => {
state.next_check = now + state.delay;
if state.delay < self.max_delay {
state.status = ConnectionStatus::IntermittentFailure;
warn!(
"Failed to send frame to remote lights, will retry in {:.2} seconds",
state.delay.num_milliseconds() as f64 / 1000.0
);
} else if state.status != ConnectionStatus::ProlongedFailure {
state.status = ConnectionStatus::ProlongedFailure;
warn!(
"Lost connection to lights, will continue retrying every {:.2} seconds",
self.max_delay.num_milliseconds() as f64 / 1000.0
);
}
state.delay = (state.delay * 2).min(self.max_delay);
Err(LightClientError::ConnectionLost)
}
Ok(Err(LightClientError::ProcessExited)) => {
warn!("Light client exited, exiting");
Err(LightClientError::ProcessExited)
}
_ => Err(LightClientError::Unlikely),
}
}
}
6 changes: 5 additions & 1 deletion light-client/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{LightClient, LightClientError};
use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError};
use async_trait::async_trait;
use lightfx::Frame;
use log::debug;
Expand All @@ -23,6 +23,10 @@ impl HttpLightClient {
.unwrap(),
}
}

pub fn with_backoff(self) -> BackoffDecorator<Self> {
BackoffDecorator::new(self)
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions light-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod backoff_decorator;
pub mod combined;
pub mod feedback;
pub mod http;
Expand Down
8 changes: 6 additions & 2 deletions light-client/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{LightClient, LightClientError};
use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError};
use async_trait::async_trait;
use lightfx::Frame;
use log::{debug, error, info};
Expand All @@ -20,8 +20,12 @@ impl TcpLightClient {
}
}

pub fn with_backoff(self) -> BackoffDecorator<Self> {
BackoffDecorator::new(self)
}

async fn connect(&self) -> Result<(), Box<dyn Error>> {
info!("Connecting to remote lights via TCP");
debug!("Connecting to remote lights via TCP");
let mut stream = self.stream.lock().await;
let s = TcpStream::connect(&self.url).await?;
s.set_nodelay(true)?;
Expand Down

0 comments on commit a3b8b17

Please sign in to comment.