Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

feat: add memory cache #12

Merged
merged 20 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions storage-node/TODO.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
### Features
1. Add in-memory cache for small and hot data.
For in-memory cache, currently we can have 2 designs of the whole procedure for write data into cache:
Generally, memory cache will record a hashmap (key -> vector of Bytes) to record the data. (Note: in current version, the file represents all the files for one S3 request, and we should optimize it later.)
1. First Design:
1. Get the file size and file from S3.
2. Storage_manager will check the file size, if it is small, try to put it into memory cache, (remember also write the evicted memory key to disk if applicable); otherwise, try to write it into disk cache.

(Disk cache and memory cache could consume a `S3Reader` as an input parameter for write_data in this case)
2. Second Dsign:
1. Get the file from S3 (as S3Reader).
2. Storage_manager will convert S3Reader to stream, and call next to get `Bytes`. Then, it will first try to write it to memory cache.

Memory cache will first add the key into its hashmap, and record the `Bytes`. But when it finds the size is too large, it will return false and also return the vector of `Bytes` for this key.

Storage_manager will write the file with `false` return value from memory cache `or` the evicted file to disk.

(Disk cache and memory cache should accept `Bytes` as an input parameter for write_data in this case)

For me, the first design is very clear, and the second design can save the cost of requesting S3 to get the size. But I think the cost is very low. Maybe the major difference is the API for write_data?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to send an additional HTTP request for every S3 request to get the file size, then the cost may not be neglectable. But I do vote for the first design cuz it's more straightforward and less prone to bugs...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to API constraints, I currently adopt method 2.
As expected, the code becomes very ugly : ( Let me think about how to refine it...

2. Integrate RocksDB.
3. Implement client and its communicaiton with the server.
4. Pin & unpin to prevent eviction the disk file that is using. (Although this situation seems to rarely happen)
### Optimization
1. Discuss the key for the cache. Currently it is raw S3 request string, but maybe we can make it as `bucket + one key`, since if one key represents a file, then for different requests, keys may overlap, and the current implementation will waste disk space.
2. Solve the read-write & write-write conflict for disk operations. Let's say we have 2 requests, and they have same key for the cache, the first request has a cache miss, then it has to go to S3 to fetch data. But now second request comes, and the first request hasn't pulled all the data from S3 and stored them in the disk. So the disk file exists, but the content is not complete. Here is one possible way: when a cache miss happens, we record this key into our cache structure, but with a status `uncompleted`, and when the data is fully pulled and stored, the status is updated to `completed`. When another same request comes and tries to `get` disk file from cache, our cache finds the key exists but its status is `uncompleted`. So the cache should wait until its status turns to `completed`. (The async design to make the cache not simply dry wait is worth discussing)
3. In async approach, we should support read & output data at the same time, both S3 read & disk write(or network write) and disk read & network write. To support this, we cannot use one single fixed buffer, which is the current implementation of disk_manager. Here is 2 ways to support this:
1. For cache policy like `lru.rs` and `lru_k.rs`, we can use fine-grained locks instead of one big lock to lock it. If we use fine-grained locks (at least after getting the value from cache, we don't need to hold its lock), we should add unpin/pin in `lru.rs` and `lru_k.rs`.
2. Discuss the key for the cache. Currently it is raw S3 request string, but maybe we can make it as `bucket + one key`, since if one key represents a file, then for different requests, keys may overlap, and the current implementation will waste disk space.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will raw S3 request string be different for the same file? Isn't it something like aws s3 cp s3://bucket-name/path/to/file.txt /local/path/file.txt? In other words, if keys overlap, won't the requests also overlap? (Not sure, maybe wrong)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this comment based on the current s3.rs test, so I am not sure what is the raw S3 request string... But if it the same as the s3.rs test (one S3 request string can be converted to one bucket + many keys, and each key represents one file), we should consider the cache key more carefully in the future. I added this todo just to remind us double check the format of raw S3 request string.
Feel free to correct me, since I almost know nothing about S3 requests. I think it depends on one raw S3 request string containing only one file or multiple files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will raw S3 request string be different for the same file?

Yes. When we make a request to an S3 object, we must specify its bucket AND its key. So we need separate requests if a table spans multiple keys in a bucket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me correct myself. The raw S3 request string means the input param from storage manager.
So a quick question: for the raw S3 request string in storage manager, can it be transformed to one bucket and multiple keys?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we have to talk to the catalog team to finalize the data scheme

3. Solve the read-write & write-write conflict for disk operations. Let's say we have 2 requests, and they have same key for the cache, the first request has a cache miss, then it has to go to S3 to fetch data. But now second request comes, and the first request hasn't pulled all the data from S3 and stored them in the disk. So the disk file exists, but the content is not complete. Here is one possible way: when a cache miss happens, we record this key into our cache structure, but with a status `uncompleted`, and when the data is fully pulled and stored, the status is updated to `completed`. When another same request comes and tries to `get` disk file from cache, our cache finds the key exists but its status is `uncompleted`. So the cache should wait until its status turns to `completed`. (The async design to make the cache not simply dry wait is worth discussing)
4. In async approach, we should support read & output data at the same time, both S3 read & disk write(or network write) and disk read & network write. To support this, we cannot use one single fixed buffer, which is the current implementation of disk_manager. Here is 2 ways to support this:
1. a buffer consumed and extended(see `s3.rs`);
2. two fixed buffers.

(Apart from it, do we have another method to make async version gain more performance than `sync` with `multiple threads`?)
4. Discuss other possibilities to parallel.
5. Design benchmark and add statistics for benchmark.
6. For stream and iterator, buffer_size is one thing we should consider. If it is too small, then we have to do more frequent I/Os, but if it is too big, then we cannot serve too many requests at the same time due to fixed total memory size. Maybe we can adjust the buffer_size dynamically.
5. Discuss other possibilities to parallel.
6. Design benchmark and add statistics for benchmark.
7. For stream and iterator, buffer_size is one thing we should consider. If it is too small, then we have to do more frequent I/Os, but if it is too big, then we cannot serve too many requests at the same time due to fixed total memory size. Maybe we can adjust the buffer_size dynamically.
29 changes: 0 additions & 29 deletions storage-node/src/cache/data/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use tokio::sync::mpsc::Receiver;
Expand All @@ -7,8 +6,6 @@ use crate::{
disk::disk_manager::DiskManager, error::ParpulseResult, storage_reader::StorageReaderStream,
};

use super::DataStore;

const DEFAULT_DISK_READER_BUFFER_SIZE: usize = 8192;
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1024;

Expand All @@ -28,9 +25,8 @@ impl DiskStore {
}
}

#[async_trait]
impl DataStore for DiskStore {
async fn read_data(
impl DiskStore {
pub async fn read_data(
&self,
key: &str,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
Expand All @@ -56,20 +52,24 @@ impl DataStore for DiskStore {
Ok(Some(rx))
}

async fn write_data(&self, key: String, data: StorageReaderStream) -> ParpulseResult<usize> {
pub async fn write_data(
&self,
key: String,
data: StorageReaderStream,
) -> ParpulseResult<usize> {
// NOTE(Yuanxin): Shall we spawn a task to write the data to disk?
let bytes_written = self
.disk_manager
.write_stream_reader_to_disk(data, &key)
.write_bytes_and_stream_to_disk(data, &key)
.await?;
Ok(bytes_written)
}

async fn clean_data(&self, key: &str) -> ParpulseResult<()> {
pub async fn clean_data(&self, key: &str) -> ParpulseResult<()> {
self.disk_manager.remove_file(key).await
}

fn data_store_key(&self, remote_location: &str) -> String {
pub fn disk_store_key(&self, remote_location: &str) -> String {
format!("{}{}", self.base_path, remote_location)
}
}
181 changes: 181 additions & 0 deletions storage-node/src/cache/data_store_cache/memdisk/cache_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use futures::stream::StreamExt;
use tokio::sync::RwLock;

use crate::{
cache::{
data_store_cache::DataStoreCache,
policy::{DataStoreReplacer, ParpulseDataStoreCacheKey},
},
disk::disk_manager::DiskManager,
error::{ParpulseError, ParpulseResult},
storage_reader::StorageReaderStream,
};

use super::data_store::{disk::DiskStore, memory::MemStore};
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::mpsc::Receiver;

/// The default maximum single file size for the memory cache.
/// If the file size exceeds this value, the file will be stored in the disk cache.
/// TODO(lanlou): make this value configurable.
pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512;

pub struct MemDiskStoreCache<R: DataStoreReplacer + Send + Sync> {
disk_store: DiskStore,
mem_store: Option<MemStore>,
/// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size)
disk_replacer: RwLock<R>,
mem_replacer: Option<RwLock<R>>,
}

impl<R: DataStoreReplacer + Send + Sync> MemDiskStoreCache<R> {
pub fn new(
disk_replacer: R,
disk_base_path: String,
mem_replacer: Option<R>,
mem_max_file_size: Option<usize>,
) -> Self {
let disk_manager = DiskManager::default();
let disk_store = DiskStore::new(disk_manager, disk_base_path);

if mem_replacer.is_some() {
let mem_store =
MemStore::new(mem_max_file_size.unwrap_or(DEFAULT_MEM_CACHE_MAX_FILE_SIZE));
MemDiskStoreCache {
disk_store,
mem_store: Some(mem_store),
disk_replacer: RwLock::new(disk_replacer),
mem_replacer: Some(RwLock::new(mem_replacer.unwrap())),
}
} else {
MemDiskStoreCache {
disk_store,
mem_store: None,
disk_replacer: RwLock::new(disk_replacer),
mem_replacer: None,
}
}
}
}

#[async_trait]
impl<R: DataStoreReplacer + Send + Sync> DataStoreCache for MemDiskStoreCache<R> {
async fn get_data_from_cache(
&mut self,
remote_location: String,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
// TODO: Refine the lock.
if let Some(mem_store) = &self.mem_store {
let mut mem_repalcer = self.mem_replacer.as_ref().unwrap().write().await;
lanlou1554 marked this conversation as resolved.
Show resolved Hide resolved
if let Some((data_store_key, _)) = mem_repalcer.get(&remote_location) {
match mem_store.read_data(data_store_key) {
Ok(Some(rx)) => return Ok(Some(rx)),
Ok(None) => {
return Err(ParpulseError::Internal(
"Memory replacer and memory store keys are inconsistent.".to_string(),
))
}
Err(e) => return Err(e),
}
}
}

let mut disk_replacer = self.disk_replacer.write().await;
match disk_replacer.get(&remote_location) {
Some((data_store_key, _)) => {
if let Some(data) = self.disk_store.read_data(data_store_key).await? {
Ok(Some(data))
} else {
unreachable!()
}
}
None => Ok(None),
}
}

async fn put_data_to_cache(
&mut self,
remote_location: String,
mut data_stream: StorageReaderStream,
) -> ParpulseResult<usize> {
let mut bytes_to_disk = None;
let mut evicted_bytes_to_disk: Option<
Vec<(ParpulseDataStoreCacheKey, (Vec<Bytes>, usize))>,
> = None;
if let Some(mem_store) = &mut self.mem_store {
let mem_store_key = mem_store.data_store_key(&remote_location);
let mut bytes_written = 0;
loop {
match data_stream.next().await {
Some(Ok(bytes)) => {
bytes_written += bytes.len();
if let Some((bytes_vec, _)) =
mem_store.write_data(mem_store_key.clone(), bytes)
{
bytes_to_disk = Some(bytes_vec);
break;
}
}
Some(Err(e)) => return Err(e),
None => break,
}
}

if bytes_to_disk.is_none() {
let mut mem_replacer = self.mem_replacer.as_ref().unwrap().write().await;
let (status, evicted_keys) = mem_replacer.put(
remote_location.clone(),
(mem_store_key.clone(), bytes_written),
);
if !status {
// Put the data to disk cache.
bytes_to_disk = Some(mem_store.clean_data(&mem_store_key).unwrap().0);
}
if let Some(evicted_keys) = evicted_keys {
evicted_bytes_to_disk = Some(
evicted_keys
.iter()
.map(|key| {
let (bytes_vec, data_size) = mem_store.clean_data(key).unwrap();
(key.clone(), (bytes_vec, data_size))
})
.collect(),
);
}
}
}

if let Some(evicted_bytes_to_disk) = evicted_bytes_to_disk {
for (key, (bytes_vec, data_size)) in evicted_bytes_to_disk {
let disk_store_key = self.disk_store.data_store_key(&key);
self.disk_store
.write_data(disk_store_key.clone(), Some(bytes_vec), None)
.await?;
let mut disk_store_cache = self.disk_replacer.write().await;
if !disk_store_cache
.put(remote_location.clone(), (disk_store_key.clone(), data_size))
.0
{
self.disk_store.clean_data(&disk_store_key).await?;
}
}
}

let disk_store_key = self.disk_store.data_store_key(&remote_location);
// TODO: Write the data store cache first w/ `incompleted` state and update the state
// after finishing writing into the data store.
let data_size = self
.disk_store
.write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream))
.await?;
let mut data_store_cache = self.disk_replacer.write().await;
if !data_store_cache
.put(remote_location, (disk_store_key.clone(), data_size))
.0
{
self.disk_store.clean_data(&disk_store_key).await?;
}
Ok(data_size)
}
}
77 changes: 77 additions & 0 deletions storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use bytes::Bytes;
use futures::StreamExt;
use tokio::sync::mpsc::Receiver;

use crate::{
disk::disk_manager::DiskManager, error::ParpulseResult, storage_reader::StorageReaderStream,
};

/// TODO(lanlou): make them configurable.
const DEFAULT_DISK_READER_BUFFER_SIZE: usize = 8192;
const DEFAULT_DISK_CHANNEL_BUFFER_SIZE: usize = 1024;

/// [`DiskStore`] stores the contents of remote objects on the local disk.
pub struct DiskStore {
disk_manager: DiskManager,
/// The path to the directory where the data is stored on the disk.
base_path: String,
}

impl DiskStore {
pub fn new(disk_manager: DiskManager, base_path: String) -> Self {
Self {
disk_manager,
base_path,
}
}
}

impl DiskStore {
pub async fn read_data(
&self,
key: &str,
) -> ParpulseResult<Option<Receiver<ParpulseResult<Bytes>>>> {
// FIXME: Shall we consider the situation where the data is not found?
let mut disk_stream = self
.disk_manager
.disk_read_stream(key, DEFAULT_DISK_READER_BUFFER_SIZE)
.await?;
let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE);
tokio::spawn(async move {
loop {
match disk_stream.next().await {
Some(Ok(bytes_read)) => {
tx.send(Ok(Bytes::from(disk_stream.buffer()[..bytes_read].to_vec())))
.await
.unwrap();
}
Some(Err(e)) => tx.send(Err(e)).await.unwrap(),
None => break,
}
}
});
Ok(Some(rx))
}

pub async fn write_data(
&self,
key: String,
bytes_vec: Option<Vec<Bytes>>,
stream: Option<StorageReaderStream>,
) -> ParpulseResult<usize> {
// NOTE(Yuanxin): Shall we spawn a task to write the data to disk?
let bytes_written = self
.disk_manager
.write_bytes_and_stream_to_disk(bytes_vec, stream, &key)
.await?;
Ok(bytes_written)
}

pub async fn clean_data(&self, key: &str) -> ParpulseResult<()> {
self.disk_manager.remove_file(key).await
}

pub fn data_store_key(&self, remote_location: &str) -> String {
format!("{}{}", self.base_path, remote_location)
}
}
Loading
Loading