Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: add completed/incompleted flag in memdisk cache, unlock storage_manager #50

Merged
merged 20 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::fs;
use std::{fs, sync::Arc};

use bytes::Bytes;
use futures::StreamExt;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc::Receiver, Mutex};

use crate::{
disk::disk_manager::DiskManager, error::ParpulseResult, storage_reader::StorageReaderStream,
cache::{
data_store_cache::memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue},
replacer::DataStoreReplacer,
},
disk::disk_manager::DiskManager,
error::ParpulseResult,
storage_reader::StorageReaderStream,
};

/// TODO(lanlou): make them configurable.
Expand All @@ -22,7 +28,10 @@ pub struct DiskStore {

impl Drop for DiskStore {
fn drop(&mut self) {
fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed");
if fs::metadata(&self.base_path).is_ok() {
println!("Removing cache files: {:?}", self.base_path);
fs::remove_dir_all(self.base_path.clone()).expect("remove cache files failed");
}
}
}

Expand All @@ -32,7 +41,6 @@ impl DiskStore {
if !final_base_path.ends_with('/') {
final_base_path += "/";
}

Self {
disk_manager,
base_path: final_base_path,
Expand All @@ -43,10 +51,14 @@ impl DiskStore {
impl DiskStore {
/// Reads data from the disk store. The method returns a stream of data read from the disk
/// store.
pub async fn read_data(
pub async fn read_data<R>(
&self,
key: &str,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
disk_replacer: Arc<Mutex<R>>,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>>
where
R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue> + 'static,
{
// TODO(lanlou): we later may consider the remaining space to decide the buffer size
let mut buffer_size = self.disk_manager.file_size(key).await? as usize;
if buffer_size > MAX_DISK_READER_BUFFER_SIZE {
Expand All @@ -55,6 +67,7 @@ impl DiskStore {
// FIXME: Shall we consider the situation where the data is not found?
let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?;
let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE);
let key_str = key.to_string().clone();
tokio::spawn(async move {
loop {
match disk_stream.next().await {
Expand All @@ -67,6 +80,8 @@ impl DiskStore {
None => break,
}
}
// TODO(lanlou): when second read, so there is no need to unpin, how to improve?
disk_replacer.lock().await.unpin(&key_str);
});
Ok(Some(rx))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use bytes::Bytes;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc::Receiver, Mutex};

use crate::error::ParpulseResult;
use crate::{
cache::{
data_store_cache::memdisk::{MemDiskStoreReplacerKey, MemDiskStoreReplacerValue},
replacer::DataStoreReplacer,
},
error::ParpulseResult,
};

const DEFAULT_MEM_CHANNEL_BUFFER_SIZE: usize = 1024;

Expand All @@ -21,17 +27,27 @@ impl MemStore {
}
}

pub fn read_data(&self, key: &str) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
pub fn read_data<R>(
&self,
key: &str,
mem_replacer: Arc<Mutex<R>>,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>>
where
R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue> + 'static,
{
let key_value = self.data.get(key);
if key_value.is_none() {
return Ok(None);
}
let data_vec = key_value.unwrap().0.clone();
let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_MEM_CHANNEL_BUFFER_SIZE);
let key_str = key.to_string().clone();
tokio::spawn(async move {
for data in data_vec.iter() {
tx.send(Ok(data.clone())).await.unwrap();
}
// TODO(lanlou): when second read, so there is no need to unpin, how to improve?
mem_replacer.lock().await.unpin(&key_str);
});
Ok(Some(rx))
}
Expand Down Expand Up @@ -61,6 +77,8 @@ impl MemStore {

#[cfg(test)]
mod tests {
use crate::cache::replacer::lru::LruReplacer;

use super::*;

#[test]
Expand Down Expand Up @@ -90,7 +108,8 @@ mod tests {
assert_eq!(res3.as_ref().unwrap().0[1], bytes2_cp);
assert_eq!(res3.as_ref().unwrap().0[2], bytes3_cp);

let read_res = mem_store.read_data(key.as_str());
let dummy_replacer = Arc::new(Mutex::new(LruReplacer::new(0)));
let read_res = mem_store.read_data(key.as_str(), dummy_replacer);
assert!(read_res.is_ok());
assert!(read_res.unwrap().is_none());
}
Expand All @@ -104,7 +123,8 @@ mod tests {
let bytes_cp = bytes.clone();
let res = mem_store.write_data(key.clone(), bytes);
assert!(res.is_none());
let read_res = mem_store.read_data(key.as_str());
let dummy_replacer = Arc::new(Mutex::new(LruReplacer::new(0)));
let read_res = mem_store.read_data(key.as_str(), dummy_replacer);
assert!(read_res.is_ok());
let mut rx = read_res.unwrap().unwrap();
let mut bytes_vec = Vec::new();
Expand Down
Loading
Loading