diff --git a/storage-node/src/cache/data_store/disk.rs b/storage-node/src/cache/data_store/disk.rs deleted file mode 100644 index 585e130..0000000 --- a/storage-node/src/cache/data_store/disk.rs +++ /dev/null @@ -1,75 +0,0 @@ -use bytes::Bytes; -use futures::StreamExt; -use tokio::sync::mpsc::Receiver; - -use crate::{ - disk::disk_manager::DiskManager, error::ParpulseResult, storage_reader::StorageReaderStream, -}; - -const DEFAULT_DISK_READER_BUFFER_SIZE: usize = 8192; -const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1024; - -/// [`DiskStore`] stores the contents of remote objects on the local disk. -pub struct DiskStore { - disk_manager: DiskManager, - /// The path to the directory where the data is stored on the disk. - base_path: String, -} - -impl DiskStore { - pub fn new(disk_manager: DiskManager, base_path: String) -> Self { - Self { - disk_manager, - base_path, - } - } -} - -impl DiskStore { - pub async fn read_data( - &self, - key: &str, - ) -> ParpulseResult>>> { - // FIXME: Shall we consider the situation where the data is not found? - let mut disk_stream = self - .disk_manager - .disk_read_stream(key, DEFAULT_DISK_READER_BUFFER_SIZE) - .await?; - let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE); - tokio::spawn(async move { - loop { - match disk_stream.next().await { - Some(Ok(bytes_read)) => { - tx.send(Ok(Bytes::from(disk_stream.buffer()[..bytes_read].to_vec()))) - .await - .unwrap(); - } - Some(Err(e)) => tx.send(Err(e)).await.unwrap(), - None => break, - } - } - }); - Ok(Some(rx)) - } - - pub async fn write_data( - &self, - key: String, - data: StorageReaderStream, - ) -> ParpulseResult { - // NOTE(Yuanxin): Shall we spawn a task to write the data to disk? - let bytes_written = self - .disk_manager - .write_bytes_and_stream_to_disk(data, &key) - .await?; - Ok(bytes_written) - } - - pub async fn clean_data(&self, key: &str) -> ParpulseResult<()> { - self.disk_manager.remove_file(key).await - } - - pub fn disk_store_key(&self, remote_location: &str) -> String { - format!("{}{}", self.base_path, remote_location) - } -} diff --git a/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs b/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs index cad4c20..7bd5066 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs @@ -99,6 +99,8 @@ impl DataStoreCache for MemDiskStoreCache remote_location: String, mut data_stream: StorageReaderStream, ) -> ParpulseResult { + // TODO: Refine the lock. + // TODO(lanlou): Also write the data to network. let mut bytes_to_disk = None; let mut evicted_bytes_to_disk: Option< Vec<(ParpulseDataStoreCacheKey, (Vec, usize))>, @@ -110,6 +112,7 @@ impl DataStoreCache for MemDiskStoreCache match data_stream.next().await { Some(Ok(bytes)) => { bytes_written += bytes.len(); + // Need to write the data to network. if let Some((bytes_vec, _)) = mem_store.write_data(mem_store_key.clone(), bytes) { @@ -146,6 +149,7 @@ impl DataStoreCache for MemDiskStoreCache } } + // Don't need to write the data to network for evicted keys. if let Some(evicted_bytes_to_disk) = evicted_bytes_to_disk { for (key, (bytes_vec, data_size)) in evicted_bytes_to_disk { let disk_store_key = self.disk_store.data_store_key(&key); @@ -162,6 +166,7 @@ impl DataStoreCache for MemDiskStoreCache } } + // Need to write the data to network for the current key. let disk_store_key = self.disk_store.data_store_key(&remote_location); // TODO: Write the data store cache first w/ `incompleted` state and update the state // after finishing writing into the data store.