Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
feat: write current data to disk and poll next data from S3 at the sa…
Browse files Browse the repository at this point in the history
…me time
  • Loading branch information
lanlou1554 committed Apr 17, 2024
1 parent 41ec309 commit 96c8091
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions storage-node/src/disk/disk_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use bytes::Bytes;
use futures::stream::StreamExt;
use futures::{future::TryFutureExt, join};

use std::future::IntoFuture;
use std::io::SeekFrom;

use std::path::{Path, PathBuf};
Expand All @@ -10,7 +12,7 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::AsyncSeekExt;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

use crate::error::ParpulseResult;
use crate::error::{ParpulseError, ParpulseResult};
use crate::storage_reader::StorageReaderStream;

use super::stream::DiskReadStream;
Expand Down Expand Up @@ -116,14 +118,23 @@ impl DiskManager {
}

if let Some(mut stream) = stream {
loop {
match stream.next().await {
Some(Ok(bytes)) => {
file.write_all(&bytes).await?;
bytes_written += bytes.len();
if let Some(Ok(mut bytes_cur)) = stream.next().await {
loop {
let disk_write_fut = TryFutureExt::into_future(file.write_all(&bytes_cur));
let bytes_next_fut = stream.next().into_future();
let res = join!(disk_write_fut, bytes_next_fut);
match res {
(Ok(_), Some(Ok(bytes_next))) => {
bytes_written += bytes_cur.len();
bytes_cur = bytes_next;
}
(Ok(_), None) => {
bytes_written += bytes_cur.len();
break;
}
(Err(e), _) => return Err(ParpulseError::Disk(e)),
(Ok(_), Some(Err(e))) => return Err(e),
}
Some(Err(e)) => return Err(e),
None => break,
}
}
}
Expand Down

0 comments on commit 96c8091

Please sign in to comment.