diff --git a/storage-node/src/disk/disk_manager.rs b/storage-node/src/disk/disk_manager.rs index a211244..9e11bd4 100644 --- a/storage-node/src/disk/disk_manager.rs +++ b/storage-node/src/disk/disk_manager.rs @@ -1,6 +1,8 @@ use bytes::Bytes; use futures::stream::StreamExt; +use futures::{future::TryFutureExt, join}; +use std::future::IntoFuture; use std::io::SeekFrom; use std::path::{Path, PathBuf}; @@ -10,7 +12,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::AsyncSeekExt; use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; -use crate::error::ParpulseResult; +use crate::error::{ParpulseError, ParpulseResult}; use crate::storage_reader::StorageReaderStream; use super::stream::DiskReadStream; @@ -116,14 +118,23 @@ impl DiskManager { } if let Some(mut stream) = stream { - loop { - match stream.next().await { - Some(Ok(bytes)) => { - file.write_all(&bytes).await?; - bytes_written += bytes.len(); + if let Some(Ok(mut bytes_cur)) = stream.next().await { + loop { + let disk_write_fut = TryFutureExt::into_future(file.write_all(&bytes_cur)); + let bytes_next_fut = stream.next().into_future(); + let res = join!(disk_write_fut, bytes_next_fut); + match res { + (Ok(_), Some(Ok(bytes_next))) => { + bytes_written += bytes_cur.len(); + bytes_cur = bytes_next; + } + (Ok(_), None) => { + bytes_written += bytes_cur.len(); + break; + } + (Err(e), _) => return Err(ParpulseError::Disk(e)), + (Ok(_), Some(Err(e))) => return Err(e), } - Some(Err(e)) => return Err(e), - None => break, } } }