Skip to content

Commit

Permalink
Make scheduling more robust against clock drift
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoxc committed Sep 23, 2024
1 parent 8cd3778 commit 9bd2717
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/crusader-lib/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
const MEASURE_DELAY: Duration = Duration::from_millis(50);

#[derive(Debug)]
struct ScheduledLoads {
time: Instant,
}
struct ScheduledLoads;

struct State {
downloads: Mutex<HashMap<TestStream, oneshot::Sender<()>>>,
Expand Down Expand Up @@ -306,13 +304,8 @@ pub(crate) async fn test_async(
.send(())
.map_err(|_| anyhow!("Failed to notify downloader"))?;
}
ServerMessage::ScheduledLoads { groups: _, time } => {
let time = Duration::from_micros(time.wrapping_add(server_time_offset));
scheduled_load_tx
.send(ScheduledLoads {
time: setup_start + time,
})
.await?
ServerMessage::ScheduledLoads { groups: _, time: _ } => {
scheduled_load_tx.send(ScheduledLoads).await?
}
_ => bail!("Unexpected message {:?}", reply),
};
Expand Down Expand Up @@ -351,7 +344,16 @@ pub(crate) async fn test_async(
state_tx.send((TestState::Grace1, start))?;
time::sleep(grace).await;

let load_delay = (Duration::from_millis(50) + latency).as_micros() as u64;
let load_delay_pure = Duration::from_millis(50);
let load_delay = (load_delay_pure + latency / 2).as_micros() as u64;

let start_time = || -> Result<Instant, anyhow::Error> {
Instant::now()
.checked_add(load_delay_pure)
.ok_or(anyhow!("Time overflow"))?
.checked_sub(latency / 2)
.ok_or(anyhow!("Time overflow"))
};

let mut test_data = Vec::new();

Expand All @@ -364,12 +366,12 @@ pub(crate) async fn test_async(
},
)
.await?;
let load = scheduled_load_rx
scheduled_load_rx
.recv()
.await
.ok_or(anyhow!("Failed to receive"))?;
let start = load.time;
state_tx.send((TestState::LoadFromServer, load.time))?;
let start = start_time()?;
state_tx.send((TestState::LoadFromServer, start))?;
msg(&format!("Testing download..."));
let _ = semaphore.acquire_many(loading_streams).await?;
let end = Instant::now();
Expand All @@ -391,12 +393,12 @@ pub(crate) async fn test_async(
},
)
.await?;
let load = scheduled_load_rx
scheduled_load_rx
.recv()
.await
.ok_or(anyhow!("Failed to receive"))?;
let start = load.time;
state_tx.send((TestState::LoadFromClient, load.time))?;
let start = start_time()?;
state_tx.send((TestState::LoadFromClient, start))?;
msg(&format!("Testing upload..."));

for _ in 0..config.streams {
Expand Down Expand Up @@ -429,12 +431,12 @@ pub(crate) async fn test_async(
},
)
.await?;
let load = scheduled_load_rx
scheduled_load_rx
.recv()
.await
.ok_or(anyhow!("Failed to receive"))?;
let start = load.time;
state_tx.send((TestState::LoadFromBoth, load.time))?;
let start = start_time()?;
state_tx.send((TestState::LoadFromBoth, start))?;
msg(&format!("Testing both download and upload..."));

for _ in 0..config.streams {
Expand Down

0 comments on commit 9bd2717

Please sign in to comment.