Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

light-client: Move backoff logic from animator to light client #220

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 17 additions & 55 deletions animator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ pub enum ControllerError {
AnimationFactoryError(#[from] AnimationFactoryError),
}

#[derive(PartialEq)]
enum ConnectionStatus {
Healthy,
IntermittentFailure,
ProlongedFailure,
}

pub struct ControllerState {
animation: Option<AnimationPlugin>,
last_frame: DateTime<Utc>,
Expand All @@ -59,7 +52,7 @@ pub struct Controller {
}

impl Controller {
pub fn new<P: AsRef<Path>>(
fn new<P: AsRef<Path>>(
points: Vec<(f64, f64, f64)>,
plugin_dir: P,
client: Box<dyn rustmas_light_client::LightClient + Sync + Send>,
Expand Down Expand Up @@ -113,28 +106,22 @@ impl Controller {
client: Box<dyn rustmas_light_client::LightClient + Sync + Send>,
point_count: usize,
) {
let start_backoff_delay: Duration = Duration::seconds(1);
let max_backoff_delay: Duration = Duration::seconds(8);

let mut backoff_delay = start_backoff_delay;
let mut status = ConnectionStatus::Healthy;
let now = Utc::now();
let mut next_check = now;
let mut next_check = Utc::now();

loop {
let now = Utc::now();
tokio::time::sleep(
(next_check - Utc::now())
(next_check - now)
.clamp(Duration::milliseconds(0), Duration::milliseconds(33))
.to_std()
.unwrap(),
)
.await;

let now = Utc::now();
let (Ok(frame),) = ({
let mut state = state.lock().await;
if now < state.next_frame {
next_check = state.next_frame.min(now + backoff_delay);
next_check = state.next_frame.min(now + Duration::seconds(1));
continue;
}
state.next_frame = if state.fps != 0.0 {
Expand All @@ -156,37 +143,10 @@ impl Controller {
continue;
};

match client.display_frame(&frame).await {
Ok(_) => {
if status != ConnectionStatus::Healthy {
info!("Regained connection to light client");
}
status = ConnectionStatus::Healthy;
backoff_delay = start_backoff_delay;
}
Err(LightClientError::ConnectionLost) => {
next_check = now + backoff_delay;
backoff_delay = (backoff_delay * 2).min(max_backoff_delay);
if backoff_delay < max_backoff_delay {
status = ConnectionStatus::IntermittentFailure;
warn!(
"Failed to send frame to remote lights, will retry in {:.2} seconds",
backoff_delay.num_milliseconds() as f64 / 1000.0
);
} else if status != ConnectionStatus::ProlongedFailure {
status = ConnectionStatus::ProlongedFailure;
warn!(
"Lost connection to lights, will continue retrying every {:.2} seconds",
max_backoff_delay.num_milliseconds() as f64 / 1000.0
);
}
}
Err(LightClientError::ProcessExited) => {
warn!("Light client exited, exiting");
return;
}
_ => (),
};
if client.display_frame(&frame).await == Err(LightClientError::ProcessExited) {
warn!("Light client exited, exiting");
return;
}
}
}

Expand Down Expand Up @@ -348,17 +308,19 @@ impl ControllerBuilder {

pub fn http_lights(mut self, path: &str) -> Result<Self, Box<dyn Error>> {
info!("Using http light client with endpoint {}", path);
self.client_builder = self
.client_builder
.with(Box::new(client::http::HttpLightClient::new(path)));
self.client_builder = self.client_builder.with(Box::new(
client::http::HttpLightClient::new(path).with_backoff(),
));
Ok(self)
}

pub fn tcp_lights(mut self, path: &str) -> Result<Self, Box<dyn Error>> {
info!("Using tcp light client with endpoint {}", path);
self.client_builder = self
.client_builder
.with(Box::new(client::tcp::TcpLightClient::new(path)));
self.client_builder = self.client_builder.with(Box::new(
client::tcp::TcpLightClient::new(path)
.with_backoff()
.with_start_delay(Duration::milliseconds(125)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a comment how this magic value was chosen?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XKCD Randomness

));
Ok(self)
}

Expand Down
1 change: 1 addition & 0 deletions light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lightfx = { path = "../lightfx" }

async-trait = "0.1.57"
csv = "1.1.6"
chrono = "0.4.31"
futures-util = "0.3.28"
log = "0.4.17"
reqwest = "0.11.12"
Expand Down
110 changes: 110 additions & 0 deletions light-client/src/backoff_decorator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::time::Duration as StdDuration;

use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use log::{info, warn};
use tokio::sync::Mutex;

use crate::{LightClient, LightClientError};

#[derive(PartialEq)]
enum ConnectionStatus {
Healthy,
IntermittentFailure,
ProlongedFailure,
}

struct BackoffState {
status: ConnectionStatus,
delay: Duration,
next_check: DateTime<Utc>,
}

pub struct BackoffDecorator<T: LightClient> {
inner: T,
start_delay: Duration,
max_delay: Duration,
timeout: StdDuration,
state: Mutex<BackoffState>,
}

impl<T: LightClient> BackoffDecorator<T> {
pub fn new(light_client: T) -> BackoffDecorator<T> {
let default_start_delay = Duration::seconds(1);
Self {
inner: light_client,
start_delay: default_start_delay,
max_delay: Duration::seconds(8),
timeout: StdDuration::from_millis(100),
state: Mutex::new(BackoffState {
status: ConnectionStatus::Healthy,
delay: default_start_delay,
next_check: Utc::now(),
}),
}
}

pub fn with_start_delay(mut self, delay: Duration) -> Self {
self.start_delay = delay;
self
}

pub fn with_max_delay(mut self, delay: Duration) -> Self {
self.max_delay = delay;
self
}

pub fn with_timeout(mut self, timeout: StdDuration) -> Self {
self.timeout = timeout;
self
}
}

#[async_trait]
impl<T> LightClient for BackoffDecorator<T>
where
T: LightClient + Send + Sync,
{
async fn display_frame(&self, frame: &lightfx::Frame) -> Result<(), LightClientError> {
let mut state = self.state.lock().await;
let now = Utc::now();
if now < state.next_check && state.status != ConnectionStatus::Healthy {
return Err(LightClientError::ConnectionLost);
}

match tokio::time::timeout(self.timeout, self.inner.display_frame(frame)).await {
Ok(Ok(_)) => {
if state.status != ConnectionStatus::Healthy {
info!("Regained connection to light client");
}
state.status = ConnectionStatus::Healthy;
state.delay = self.start_delay;
state.next_check = now;
Ok(())
}
Ok(Err(LightClientError::ConnectionLost)) | Err(_) => {
state.next_check = now + state.delay;
if state.delay < self.max_delay {
state.status = ConnectionStatus::IntermittentFailure;
warn!(
"Failed to send frame to remote lights, will retry in {:.2} seconds",
state.delay.num_milliseconds() as f64 / 1000.0
);
} else if state.status != ConnectionStatus::ProlongedFailure {
state.status = ConnectionStatus::ProlongedFailure;
warn!(
"Lost connection to lights, will continue retrying every {:.2} seconds",
self.max_delay.num_milliseconds() as f64 / 1000.0
);
}
state.delay = (state.delay * 2).min(self.max_delay);
Err(LightClientError::ConnectionLost)
}
Ok(Err(LightClientError::ProcessExited)) => {
warn!("Light client exited, exiting");
Err(LightClientError::ProcessExited)
}
_ => Err(LightClientError::Unlikely),
}
}
}
6 changes: 5 additions & 1 deletion light-client/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{LightClient, LightClientError};
use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError};
use async_trait::async_trait;
use lightfx::Frame;
use log::debug;
Expand All @@ -23,6 +23,10 @@ impl HttpLightClient {
.unwrap(),
}
}

pub fn with_backoff(self) -> BackoffDecorator<Self> {
BackoffDecorator::new(self)
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions light-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod backoff_decorator;
pub mod combined;
pub mod feedback;
pub mod http;
Expand Down
8 changes: 6 additions & 2 deletions light-client/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{LightClient, LightClientError};
use crate::{backoff_decorator::BackoffDecorator, LightClient, LightClientError};
use async_trait::async_trait;
use lightfx::Frame;
use log::{debug, error, info};
Expand All @@ -20,8 +20,12 @@ impl TcpLightClient {
}
}

pub fn with_backoff(self) -> BackoffDecorator<Self> {
BackoffDecorator::new(self)
}

async fn connect(&self) -> Result<(), Box<dyn Error>> {
info!("Connecting to remote lights via TCP");
debug!("Connecting to remote lights via TCP");
let mut stream = self.stream.lock().await;
let s = TcpStream::connect(&self.url).await?;
s.set_nodelay(true)?;
Expand Down
Loading