From be5c9ee1b8ba7c990acf2df8e1fa667623065f3a Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Thu, 22 Aug 2024 10:19:37 +0200 Subject: [PATCH] fix (CR): Correct doc, speed processing and refactorize according to Code review Signed-off-by: Jeremy HERGAULT --- prosa/src/event/speed.rs | 44 ++++++++++++++++++++-------------------- prosa/src/inj/proc.rs | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/prosa/src/event/speed.rs b/prosa/src/event/speed.rs index 1c9b2f4..768fc49 100644 --- a/prosa/src/event/speed.rs +++ b/prosa/src/event/speed.rs @@ -48,7 +48,7 @@ pub struct Speed { } impl Speed { - /// Create a new speed from number of desire sample (5 minimum) + /// Create a new speed using a number of desired samples (5 minimum) pub fn new(size_for_average: u16) -> Speed { let size = if size_for_average > 5 { size_for_average as usize @@ -80,33 +80,39 @@ impl Speed { self.event_speeds.front() } - /// Getter of the mean time between transaction + /// Accumulate all event speeds as Duration /// - /// ΣtNt = mean - pub fn get_mean_duration(&self) -> Duration { - let mut mean_duration = Duration::default(); + /// Σt + fn accumulate_event_speeds(&self) -> Duration { + let mut duration = Duration::ZERO; let mut instant = Instant::now(); for event_speed in &self.event_speeds { - mean_duration += instant.duration_since(*event_speed); + duration += instant.duration_since(*event_speed); instant = *event_speed; } - mean_duration.div_f32(self.event_speeds.capacity() as f32) + duration + } + + /// Getter of the mean time between transaction + /// + /// ΣtNt = mean + pub fn get_mean_duration(&self) -> Duration { + if !self.event_speeds.is_empty() { + self.accumulate_event_speeds() + .div_f32(self.event_speeds.len() as f32) + } else { + Duration::ZERO + } } /// Getter of the current speed of transaction flow /// /// 1000 × NtΣt = TPS pub fn get_speed(&self) -> f64 { - let mut sum_duration = Duration::default(); - let mut instant = Instant::now(); - for event_speed in &self.event_speeds { - sum_duration += instant.duration_since(*event_speed); - instant = *event_speed; - } - + let sum_duration = self.accumulate_event_speeds(); if !sum_duration.is_zero() { - (1000 * self.event_speeds.capacity()) as f64 / sum_duration.as_millis() as f64 + (1000 * self.event_speeds.len()) as f64 / sum_duration.as_millis() as f64 } else { 0.0 } @@ -119,15 +125,9 @@ impl Speed { /// /// 1000 × NtTPS + overhead − Σt = duration pub fn get_duration_overhead(&self, tps: f64, overhead: Option) -> Duration { - let mut sum_duration = Duration::default(); - let mut instant = Instant::now(); - for event_speed in &self.event_speeds { - sum_duration += instant.duration_since(*event_speed); - instant = *event_speed; - } - let duration = Duration::from_millis(((1000 * self.event_speeds.len()) as f64 / tps) as u64); + let sum_duration = self.accumulate_event_speeds(); if let Some(overhead) = overhead { duration .saturating_add(overhead) diff --git a/prosa/src/inj/proc.rs b/prosa/src/inj/proc.rs index fc4947f..055f2a6 100644 --- a/prosa/src/inj/proc.rs +++ b/prosa/src/inj/proc.rs @@ -154,7 +154,7 @@ impl InjProc { debug!(name: "resp_inj_proc", target: "prosa::inj::proc", proc_name = name, service = msg.get_service(), response = format!("{:?}", msg.get_data())); adaptor.process_response(msg.get_data(), msg.get_service())?; - regulator.notify_receive_transaction(Duration::default()); + regulator.notify_receive_transaction(msg.elapsed()); // Build the next transaction let _ = next_transaction.get_or_insert(adaptor.build_transaction());