Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add early rejection of IOs if too many Downstairs are inactive #1565

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3261,6 +3261,18 @@ impl Downstairs {
}
}

/// Returns the number of clients that can accept IO
///
/// A client can accept IO if it is in the `Active` or `LiveRepair` state.
pub fn active_client_count(&self) -> usize {
self.clients
.iter()
.filter(|c| {
matches!(c.state(), DsState::Active | DsState::LiveRepair)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to allow additional states here. Offliine for instance is one where we have not yet decided what the situation is, and it could just be a downstairs rebooting and soon enough (fingers crossed) it will come back.

There may be other transitory states to consider as well.

Copy link
Contributor Author

@mkeeter mkeeter Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll revisit this. In parallel, I've been working on simplifying the state machine (#1568 and #1570), so it might make sense to bring them in first.

})
.count()
}

/// Wrapper for marking a single job as done from the given client
///
/// This can be used to test handling of job acks, etc
Expand Down
239 changes: 237 additions & 2 deletions upstairs/src/dummy_downstairs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crucible_protocol::JobId;
use crucible_protocol::Message;
use crucible_protocol::ReadBlockContext;
use crucible_protocol::ReadResponseHeader;
use crucible_protocol::SnapshotDetails;
use crucible_protocol::WriteHeader;

use bytes::BytesMut;
Expand Down Expand Up @@ -289,6 +290,35 @@ impl DownstairsHandle {
}
}

/// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError`
///
/// Returns the flush number for further checks.
///
/// # Panics
/// If a non-flush message arrives
pub async fn err_flush(&mut self) -> u64 {
match self.recv().await.unwrap() {
Message::Flush {
job_id,
flush_number,
upstairs_id,
..
} => {
self.send(Message::FlushAck {
upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id,
result: Err(CrucibleError::IoError("oh no".to_string())),
})
.unwrap();
flush_number
}
m => {
panic!("saw non flush {m:?}");
}
}
}

/// Awaits a `Message::Write { .. }` and sends a `WriteAck`
///
/// Returns the job ID for further checks.
Expand All @@ -311,6 +341,23 @@ impl DownstairsHandle {
}
}

/// Awaits a `Message::Write` and sends a `WriteAck` with `IOError`
pub async fn err_write(&mut self) -> JobId {
match self.recv().await.unwrap() {
Message::Write { header, .. } => {
self.send(Message::WriteAck {
upstairs_id: header.upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id: header.job_id,
result: Err(CrucibleError::IoError("oh no".to_string())),
})
.unwrap();
header.job_id
}
m => panic!("saw non write: {m:?}"),
}
}

/// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck`
///
/// Returns the job ID for further checks.
Expand Down Expand Up @@ -358,7 +405,7 @@ impl DownstairsHandle {
job_id,
blocks: Ok(vec![block]),
},
data: data.clone(),
data,
})
.unwrap();
job_id
Expand Down Expand Up @@ -811,7 +858,7 @@ async fn run_live_repair(mut harness: TestHarness) {
job_id,
blocks: Ok(vec![block]),
},
data: data.clone(),
data,
}) {
Ok(()) => panic!("DS1 should be disconnected"),
Err(e) => {
Expand Down Expand Up @@ -2890,3 +2937,191 @@ async fn test_bytes_based_barrier() {
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
}

/// Test for early rejection of writes if > 1 Downstairs is unavailable
#[tokio::test]
async fn fast_write_rejection() {
let mut harness = TestHarness::new().await;

let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();

harness.ds1().err_write().await;
harness.ds2.ack_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Send a second write, which should still work (because we have 2/3 ds)
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds2.err_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Faulted);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Subsequent writes should be rejected immediately
let r = harness.guest.write(BlockIndex(0), write_buf.clone()).await;
assert!(
matches!(r, Err(CrucibleError::IoError(..))),
"expected IoError, got {r:?}"
);
}

/// Make sure reads work with only 1x Downstairs
#[tokio::test]
async fn read_with_one_fault() {
let mut harness = TestHarness::new().await;

// Use a write to fault DS0 (XXX why do read errors not fault a DS?)
let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds1().err_write().await;
harness.ds2.ack_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Check that reads still work
let h = harness.spawn(|guest| async move {
let mut buffer = Buffer::new(1, 512);
guest.read(BlockIndex(0), &mut buffer).await.unwrap();
});
harness.ds2.ack_read().await;
h.await.unwrap(); // we have > 1x reply, so the read will return
harness.ds3.ack_read().await;

// Take out DS1 next
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds2.err_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Faulted);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Reads still work with 1x Downstairs
let h = harness.spawn(|guest| async move {
let mut buffer = Buffer::new(1, 512);
guest.read(BlockIndex(0), &mut buffer).await.unwrap();
});
harness.ds3.ack_read().await;
h.await.unwrap(); // we have > 1x reply, so the read will return
}

/// Test early rejection of reads with 0x running Downstairs
#[tokio::test]
async fn fast_read_rejection() {
let mut harness = TestHarness::new().await;

// Use a write to fault DS0 (XXX why do read errors not fault a DS?)
let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds1().err_write().await;
harness.ds2.err_write().await;
harness.ds3.err_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Faulted);
assert_eq!(ds[ClientId::new(2)], DsState::Faulted);

// Reads should return errors immediately
let mut buffer = Buffer::new(1, 512);
match harness.guest.read(BlockIndex(0), &mut buffer).await {
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}
}

/// Test for early rejection of flushes
#[tokio::test]
async fn fast_flush_rejection() {
let mut harness = TestHarness::new().await;

let h = harness.spawn(|guest| async move {
guest.flush(None).await.unwrap();
});
harness.ds1().err_flush().await;
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
h.await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// A flush with snapshot should fail immediately
match harness
.guest
.flush(Some(SnapshotDetails {
snapshot_name: "hiiiii".to_string(),
}))
.await
{
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}

// A non-snapshot flush should still succeed
let h = harness.spawn(|guest| async move {
guest.flush(None).await.unwrap();
});
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
h.await.unwrap();

// Use a flush to take out another downstairs
let h = harness.spawn(|guest| async move { guest.flush(None).await });
harness.ds2.ack_flush().await;
harness.ds3.err_flush().await;
let r = h.await.unwrap();
assert!(r.is_err());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Faulted);

// Subsequent flushes should fail immediately
match harness.guest.flush(None).await {
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}
}
5 changes: 5 additions & 0 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,11 @@ impl DownstairsIO {

let bad_job = match &self.work {
IOop::Read { .. } => wc.done == 0,
// Flushes with snapshots must be good on all 3x Downstairs
IOop::Flush {
snapshot_details: Some(..),
..
} => wc.skipped + wc.error > 0,
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Flush { .. }
Expand Down
28 changes: 28 additions & 0 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,15 @@ impl Upstairs {
done.send_err(CrucibleError::UpstairsInactive);
return;
}

let n = self.downstairs.active_client_count();
let required = if snapshot_details.is_some() { 3 } else { 2 };
if n < required {
done.send_err(CrucibleError::IoError(format!(
"too many inactive clients: need {required}, got {n}"
)));
return;
}
self.submit_flush(Some(done), snapshot_details, Some(io_guard));
}
BlockOp::ReplaceDownstairs { id, old, new, done } => {
Expand Down Expand Up @@ -1354,6 +1363,17 @@ impl Upstairs {
return;
}

let n = self.downstairs.active_client_count();
if n < 1 {
res.send_err((
data,
CrucibleError::IoError(format!(
"too many inactive clients: need 1, got {n}"
)),
));
return;
}

/*
* Get the next ID for the guest work struct we will make at the
* end. This ID is also put into the IO struct we create that
Expand Down Expand Up @@ -1470,6 +1490,14 @@ impl Upstairs {
return None;
}

let n = self.downstairs.active_client_count();
if n < 2 {
res.send_err(CrucibleError::IoError(format!(
"too many inactive clients: need 2, got {n}"
)));
return None;
}

/*
* Verify IO is in range for our region
*/
Expand Down