From ddf2d75e7d46dd4dde10436914999972781d08e4 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Tue, 30 Apr 2024 11:10:51 -0400 Subject: [PATCH 1/4] fix: fix bugs for considering eviction in parallel --- .../memdisk/data_store/disk.rs | 10 +- .../src/cache/data_store_cache/memdisk/mod.rs | 264 +++++++++++------- storage-node/src/storage_manager.rs | 69 +++++ 3 files changed, 243 insertions(+), 100 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs index c9a7189..adae41b 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/data_store/disk.rs @@ -56,6 +56,7 @@ impl DiskStore { &self, key: &str, disk_replacer: Arc>, + key_replacer: String, ) -> ParpulseResult>>> where R: DataStoreReplacer + 'static, @@ -68,7 +69,6 @@ impl DiskStore { // FIXME: Shall we consider the situation where the data is not found? let mut disk_stream = self.disk_manager.disk_read_stream(key, buffer_size).await?; let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_DISK_CHANNEL_BUFFER_SIZE); - let key_str = key.to_string().clone(); tokio::spawn(async move { loop { match disk_stream.next().await { @@ -78,11 +78,13 @@ impl DiskStore { .unwrap(); } Some(Err(e)) => tx.send(Err(e)).await.unwrap(), - None => break, + None => { + // TODO(lanlou): when second read, so there is no need to unpin, how to improve? + disk_replacer.lock().await.unpin(&key_replacer); + break; + } } } - // TODO(lanlou): when second read, so there is no need to unpin, how to improve? - disk_replacer.lock().await.unpin(&key_str); }); Ok(Some(rx)) } diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 0d2eb96..7a3dde0 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -33,7 +33,7 @@ pub type MemDiskStoreReplacerKey = String; /// Status -> completed/incompleted; usize -> file_size /// Notify -> notify; usize -> notify_waiter_count /// FIXME: make (Notify, Mutex) a struct to make it more readable. -type StatusKeyHashMap = HashMap)>)>; +type StatusKeyHashMap = HashMap)>; pub struct MemDiskStoreReplacerValue { /// The path to the data store. For mem store, it should be data's s3 path. For disk @@ -65,7 +65,21 @@ impl ReplacerValue for MemDiskStoreReplacerValue { } } -#[derive(Clone)] +struct MemDiskStoreNotify { + inner: Notify, + waiter_count: Mutex, +} + +impl MemDiskStoreNotify { + fn new() -> Self { + MemDiskStoreNotify { + inner: Notify::new(), + waiter_count: Mutex::new(0), + } + } +} + +#[derive(Clone, PartialEq)] enum Status { Incompleted, Completed, @@ -130,6 +144,55 @@ impl> } } } + + async fn notify_waiters_error(&self, key: &str) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + notify = status_of_keys.remove(key).unwrap().1; + } + notify.inner.notify_waiters(); + } + + async fn notify_waiters_mem(&self, key: &String, bytes_mem_written: usize) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); + *size = bytes_mem_written; + { + let notify_waiter = notify_ref.waiter_count.lock().await; + if *notify_waiter > 0 { + self.mem_replacer + .as_ref() + .unwrap() + .lock() + .await + .pin(key, *notify_waiter); + } + } + notify = notify_ref.clone(); + *status = Status::Completed; + } + // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock + notify.inner.notify_waiters(); + } + + async fn notify_waiters_disk(&self, key: &String, bytes_disk_written: usize) { + let notify; + { + let mut status_of_keys = self.status_of_keys.write().await; + let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); + *status = Status::Completed; + *size = bytes_disk_written; + let notify_waiter = notify_ref.waiter_count.lock().await; + if *notify_waiter > 0 { + self.disk_replacer.lock().await.pin(key, *notify_waiter); + } + notify = notify_ref.clone(); + } + notify.inner.notify_waiters(); + } } #[async_trait] @@ -180,7 +243,11 @@ impl> D } if let Some(data) = self .disk_store - .read_data(&data_store_key, self.disk_replacer.clone()) + .read_data( + &data_store_key, + self.disk_replacer.clone(), + remote_location.clone(), + ) .await? { Ok(Some(data)) @@ -208,10 +275,10 @@ impl> D match status { Status::Incompleted => { notify = notify_ref.clone(); - *notify_ref.1.lock().await += 1; + *notify_ref.waiter_count.lock().await += 1; // The Notified future is guaranteed to receive wakeups from notify_waiters() // as soon as it has been created, even if it has not yet been polled. - let notified = notify.0.notified(); + let notified = notify.inner.notified(); drop(status_of_keys); notified.await; } @@ -234,7 +301,7 @@ impl> D remote_location.clone(), ( (Status::Incompleted, 0), - Arc::new((Notify::new(), Mutex::new(0))), + Arc::new(MemDiskStoreNotify::new()), ), ); } @@ -243,9 +310,7 @@ impl> D // TODO(lanlou): Also write the data to network. let mut bytes_to_disk = None; let mut bytes_mem_written = 0; - let mut evicted_bytes_to_disk: Option, usize))>> = - None; - // 1. If the mem_store is enabled, first try to write the data to memory. + // If the mem_store is enabled, first try to write the data to memory. // Note: Only file which size < mem_max_file_size can be written to memory. if let Some(mem_store) = &self.mem_store { loop { @@ -268,7 +333,7 @@ impl> D Some(Err(e)) => { // TODO(lanlou): Every time it returns an error, I need to manually add this clean code... // How to improve? - self.status_of_keys.write().await.remove(&remote_location); + self.notify_waiters_error(&remote_location).await; return Err(e); } None => break, @@ -300,72 +365,83 @@ impl> D // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { - let mut evicted_bytes_to_disk_inner = Vec::new(); let mut mem_store = mem_store.write().await; + // TODO(lanlou): I have to grab this huge lock when evicting from mem to disk, since otherwise another + // thread may have the same evict_key as the new coming request, and it will put `Incompleted` into status_of_keys, + // and there is conflict. (maybe add incompleted status for evicted keys, but tricky) + let mut status_of_keys = self.status_of_keys.write().await; + drop(mem_replacer); + // Put the evicted keys to disk cache. for evicted_key in evicted_keys { - evicted_bytes_to_disk_inner.push(( + if let Some(((status, _), _)) = status_of_keys.get(&evicted_key.clone()) + { + // If the key is still in the status_of_keys, it means the data is still in progress. + // We should not put the data to disk cache. + if *status == Status::Incompleted { + continue; + } + } + + let (bytes_vec, data_size) = + mem_store.clean_data(&evicted_key).unwrap(); + assert_ne!(evicted_key, remote_location); + let disk_store_key = self.disk_store.data_store_key(&evicted_key); + if self + .disk_store + .write_data(disk_store_key.clone(), Some(bytes_vec), None) + .await + .is_err() + { + warn!("Failed to write evicted data to disk for {}", evicted_key); + continue; + } + let mut disk_replacer = self.disk_replacer.lock().await; + match disk_replacer.put( evicted_key.clone(), - mem_store.clean_data(&evicted_key).unwrap(), - )); + MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), + ) { + Some(evicted_keys) => { + // evict data from disk + for evicted_key in evicted_keys { + // FIXME: I think all status of the evicted_key removed here should be + // `Completed`? + status_of_keys.remove(&evicted_key); + self.disk_store + .clean_data( + &self.disk_store.data_store_key(&evicted_key), + ) + .await?; + } + } + None => { + if let Err(e) = + self.disk_store.clean_data(&disk_store_key).await + { + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + // It is not main in-progress thread for evicted_key, and its status should be + // `Completed`, so there is totally no need to notify the waiters. + // FIXME: can we safely remove evicted_key here? + status_of_keys.remove(&evicted_key); + } + } } - evicted_bytes_to_disk = Some(evicted_bytes_to_disk_inner); } } } - } - // 2. If applicable, put all the evicted data into disk cache. - // Don't need to write the data to network for evicted keys. - if let Some(evicted_bytes_to_disk) = evicted_bytes_to_disk { - for (remote_location_evicted, (bytes_vec, data_size)) in evicted_bytes_to_disk { - assert_ne!(remote_location_evicted, remote_location); - let disk_store_key = self.disk_store.data_store_key(&remote_location_evicted); - self.disk_store - .write_data(disk_store_key.clone(), Some(bytes_vec), None) - .await?; - let mut disk_replacer = self.disk_replacer.lock().await; - if disk_replacer - .put( - remote_location_evicted, - MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), - ) - .is_none() - { - if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { - warn!( - "Failed to clean data ({}) from disk store: {}", - disk_store_key, e - ); - } - warn!( - "Failed to put evicted data ({}) to disk replacer.", - disk_store_key - ); - } - } - } - - // 3. If the data is successfully written to memory, directly return. - if self.mem_store.is_some() && bytes_to_disk.is_none() { - let notify; - { - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify_ref) = - status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = bytes_mem_written; - { - let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; - mem_replacer.pin(&remote_location, *notify_ref.1.lock().await); - } - notify = notify_ref.clone(); + // If the data is successfully written to memory, directly return. + if bytes_to_disk.is_none() { + self.notify_waiters_mem(&remote_location, bytes_mem_written) + .await; + return Ok(bytes_mem_written); } - // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock - notify.0.notify_waiters(); - return Ok(bytes_mem_written); } - // 4. If the data is not written to memory_cache successfully, then cache it to disk. + // If the data is not written to memory_cache successfully, then cache it to disk. // Need to write the data to network for the current key. let disk_store_key = self.disk_store.data_store_key(&remote_location); let data_size_wrap = self @@ -373,46 +449,42 @@ impl> D .write_data(disk_store_key.clone(), bytes_to_disk, Some(data_stream)) .await; if let Err(e) = data_size_wrap { - self.status_of_keys.write().await.remove(&remote_location); + self.notify_waiters_error(&remote_location).await; return Err(e); } let data_size = data_size_wrap.unwrap(); { let mut disk_replacer = self.disk_replacer.lock().await; - if disk_replacer - .put( - remote_location.clone(), - MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), - ) - .is_none() - { - if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { - // TODO: do we need to notify the caller this failure? - warn!( - "Failed to clean data ({}) from disk store: {}", - disk_store_key, e - ); + match disk_replacer.put( + remote_location.clone(), + MemDiskStoreReplacerValue::new(disk_store_key.clone(), data_size), + ) { + Some(evicted_keys) => { + let mut status_of_keys = self.status_of_keys.write().await; + for evicted_key in evicted_keys { + status_of_keys.remove(&evicted_key); + self.disk_store + .clean_data(&self.disk_store.data_store_key(&evicted_key)) + .await?; + } + } + None => { + if let Err(e) = self.disk_store.clean_data(&disk_store_key).await { + warn!( + "Failed to clean data ({}) from disk store: {}", + disk_store_key, e + ); + } + // We have to notify waiters, because it is the main thread for the current key. + self.notify_waiters_error(&remote_location).await; + return Err(ParpulseError::Internal( + "Failed to put data to disk replacer.".to_string(), + )); } - self.status_of_keys.write().await.remove(&remote_location); - return Err(ParpulseError::Internal( - "Failed to put data to disk replacer.".to_string(), - )); } disk_replacer.pin(&remote_location, 1); } - let notify; - { - let mut status_of_keys = self.status_of_keys.write().await; - let ((status, size), notify_ref) = status_of_keys.get_mut(&remote_location).unwrap(); - *status = Status::Completed; - *size = data_size; - self.disk_replacer - .lock() - .await - .pin(&remote_location, *notify_ref.1.lock().await); - notify = notify_ref.clone(); - } - notify.0.notify_waiters(); + self.notify_waiters_disk(&remote_location, data_size).await; Ok(data_size) } } diff --git a/storage-node/src/storage_manager.rs b/storage-node/src/storage_manager.rs index 96f5e5d..c5a867e 100644 --- a/storage-node/src/storage_manager.rs +++ b/storage-node/src/storage_manager.rs @@ -611,4 +611,73 @@ mod tests { assert!(result.6.is_ok()); assert_eq!(consume_receiver(result.6.unwrap().unwrap()).await, 930); } + + #[tokio::test] + async fn test_evict_disk() { + let disk_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + None, + None, + ); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let res1 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res1.is_ok()); + assert_eq!(consume_receiver(res1.unwrap()).await, 112193); + let res2 = storage_manager.get_data(request_data2.clone(), true).await; + assert!(res2.is_ok()); + assert_eq!(consume_receiver(res2.unwrap()).await, 113629); + let res3 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res3.is_ok()); + assert_eq!(consume_receiver(res3.unwrap()).await, 112193); + } + + #[tokio::test] + async fn test_evict_mem() { + let disk_cache = LruReplacer::new(10); + let mem_cache = LruReplacer::new(120000); + + let tmp = tempfile::tempdir().unwrap(); + let disk_cache_base_path = tmp.path().to_owned(); + + let data_store_cache = MemDiskStoreCache::new( + disk_cache, + disk_cache_base_path.display().to_string(), + Some(mem_cache), + Some(120000), + ); + let storage_manager = Arc::new(StorageManager::new(vec![data_store_cache])); + + let request_path_bucket1 = "tests-parquet".to_string(); + let request_path_keys1 = vec!["userdata2.parquet".to_string()]; + let request_data1 = RequestParams::MockS3((request_path_bucket1, request_path_keys1)); + + let request_path_bucket2 = "tests-parquet".to_string(); + let request_path_keys2 = vec!["userdata1.parquet".to_string()]; + let request_data2 = RequestParams::MockS3((request_path_bucket2, request_path_keys2)); + + let res1 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res1.is_ok()); + assert_eq!(consume_receiver(res1.unwrap()).await, 112193); + let res2 = storage_manager.get_data(request_data2.clone(), true).await; + assert!(res2.is_ok()); + assert_eq!(consume_receiver(res2.unwrap()).await, 113629); + let res3 = storage_manager.get_data(request_data1.clone(), true).await; + assert!(res3.is_ok()); + assert_eq!(consume_receiver(res3.unwrap()).await, 112193); + } } From 68057659b39c5249d9332e760a7a070b430608b6 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Tue, 30 Apr 2024 11:53:43 -0400 Subject: [PATCH 2/4] fix: later finding completed threads need pin --- .../src/cache/data_store_cache/memdisk/mod.rs | 93 +++++++++++-------- 1 file changed, 56 insertions(+), 37 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index 7a3dde0..f2f0d87 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -82,7 +82,8 @@ impl MemDiskStoreNotify { #[derive(Clone, PartialEq)] enum Status { Incompleted, - Completed, + MemCompleted, + DiskCompleted, } pub struct MemDiskStoreCache< @@ -172,7 +173,7 @@ impl> } } notify = notify_ref.clone(); - *status = Status::Completed; + *status = Status::MemCompleted; } // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock notify.inner.notify_waiters(); @@ -183,7 +184,7 @@ impl> { let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); - *status = Status::Completed; + *status = Status::DiskCompleted; *size = bytes_disk_written; let notify_waiter = notify_ref.waiter_count.lock().await; if *notify_waiter > 0 { @@ -264,48 +265,63 @@ impl> D _data_size: Option, mut data_stream: StorageReaderStream, ) -> ParpulseResult { + // If in_progress of remote_location thread fails, it will clean the data from this hash map. { - // If in_progress of remote_location thread fails, it will clean the data from this hash map. - let mut existed = false; - loop { - let status_of_keys = self.status_of_keys.read().await; - if let Some(((status, size), notify_ref)) = status_of_keys.get(&remote_location) { - let notify; - existed = true; - match status { - Status::Incompleted => { - notify = notify_ref.clone(); - *notify_ref.waiter_count.lock().await += 1; - // The Notified future is guaranteed to receive wakeups from notify_waiters() - // as soon as it has been created, even if it has not yet been polled. - let notified = notify.inner.notified(); - drop(status_of_keys); - notified.await; + let status_of_keys = self.status_of_keys.read().await; + if let Some(((status, size), notify_ref)) = status_of_keys.get(&remote_location) { + let notify; + + match status { + Status::Incompleted => { + notify = notify_ref.clone(); + *notify_ref.waiter_count.lock().await += 1; + // The Notified future is guaranteed to receive wakeups from notify_waiters() + // as soon as it has been created, even if it has not yet been polled. + let notified = notify.inner.notified(); + drop(status_of_keys); + notified.await; + if let Some(((status, size), _)) = + self.status_of_keys.read().await.get(&remote_location) + { + assert!( + *status == Status::MemCompleted || *status == Status::DiskCompleted + ); + return Ok(*size); + } else { + return Err(ParpulseError::Internal( + "Put_data_to_cache fails".to_string(), + )); } - Status::Completed => { + } + Status::MemCompleted => { + // not be notified + let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; + if mem_replacer.peek(&remote_location).is_some() { + mem_replacer.pin(&remote_location, 1); return Ok(*size); } + // If mem_replacer has no data, then update status_of_keys + } + Status::DiskCompleted => { + let mut disk_replacer = self.disk_replacer.lock().await; + if disk_replacer.peek(&remote_location).is_some() { + disk_replacer.pin(&remote_location, 1); + return Ok(*size); + } + // If disk_replacer has no data, then update status_of_keys } - } else { - // FIXME: status_of_keys lock should be released after break - break; } } - if existed { - // Another in progress of remote_location thread fails - return Err(ParpulseError::Internal( - "Put_data_to_cache fails".to_string(), - )); - } - self.status_of_keys.write().await.insert( - remote_location.clone(), - ( - (Status::Incompleted, 0), - Arc::new(MemDiskStoreNotify::new()), - ), - ); } + self.status_of_keys.write().await.insert( + remote_location.clone(), + ( + (Status::Incompleted, 0), + Arc::new(MemDiskStoreNotify::new()), + ), + ); + // TODO: Refine the lock. // TODO(lanlou): Also write the data to network. let mut bytes_to_disk = None; @@ -373,13 +389,16 @@ impl> D drop(mem_replacer); // Put the evicted keys to disk cache. for evicted_key in evicted_keys { - if let Some(((status, _), _)) = status_of_keys.get(&evicted_key.clone()) + if let Some(((status, _), _)) = + status_of_keys.get_mut(&evicted_key.clone()) { // If the key is still in the status_of_keys, it means the data is still in progress. // We should not put the data to disk cache. if *status == Status::Incompleted { continue; } + assert!(*status == Status::MemCompleted); + *status = Status::DiskCompleted; } let (bytes_vec, data_size) = From 6605327467cf14c40abdfb6a4464a3601425b643 Mon Sep 17 00:00:00 2001 From: Lan Lou Date: Tue, 30 Apr 2024 11:58:25 -0400 Subject: [PATCH 3/4] add comments --- storage-node/src/cache/data_store_cache/memdisk/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index f2f0d87..acab2ee 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -294,7 +294,8 @@ impl> D } } Status::MemCompleted => { - // not be notified + // This code only applies to: a thread tries to `put_data_to_cache` but the data is already in cache + // and not be evicted. It is not wait and notified situation. let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; if mem_replacer.peek(&remote_location).is_some() { mem_replacer.pin(&remote_location, 1); @@ -303,6 +304,8 @@ impl> D // If mem_replacer has no data, then update status_of_keys } Status::DiskCompleted => { + // This code only applies to: a thread tries to `put_data_to_cache` but the data is already in cache + // and not be evicted. It is not wait and notified situation. let mut disk_replacer = self.disk_replacer.lock().await; if disk_replacer.peek(&remote_location).is_some() { disk_replacer.pin(&remote_location, 1); From 71fd7e4cf68d694f03f55edaeb4b3009ec8db8ca Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Tue, 30 Apr 2024 19:52:36 +0000 Subject: [PATCH 4/4] add debug log --- .../src/cache/data_store_cache/memdisk/mod.rs | 118 ++++++++++++++---- 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/storage-node/src/cache/data_store_cache/memdisk/mod.rs b/storage-node/src/cache/data_store_cache/memdisk/mod.rs index acab2ee..7d8b5b1 100644 --- a/storage-node/src/cache/data_store_cache/memdisk/mod.rs +++ b/storage-node/src/cache/data_store_cache/memdisk/mod.rs @@ -3,7 +3,7 @@ pub mod data_store; use std::{collections::HashMap, sync::Arc}; use futures::stream::StreamExt; -use log::warn; +use log::{debug, warn}; use tokio::sync::{Mutex, Notify, RwLock}; use crate::{ @@ -31,8 +31,6 @@ pub const DEFAULT_MEM_CACHE_MAX_FILE_SIZE: usize = 1024 * 512; pub type MemDiskStoreReplacerKey = String; /// Status -> completed/incompleted; usize -> file_size -/// Notify -> notify; usize -> notify_waiter_count -/// FIXME: make (Notify, Mutex) a struct to make it more readable. type StatusKeyHashMap = HashMap)>; pub struct MemDiskStoreReplacerValue { @@ -79,7 +77,7 @@ impl MemDiskStoreNotify { } } -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, Debug)] enum Status { Incompleted, MemCompleted, @@ -94,9 +92,6 @@ pub struct MemDiskStoreCache< /// Cache_key = S3_PATH; Cache_value = (CACHE_BASE_PATH + S3_PATH, size) disk_replacer: Arc>, mem_replacer: Option>>, - // MemDiskStoreReplacerKey -> remote_location - // status: 0 -> incompleted; 1 -> completed; 2 -> failed - // TODO(lanlou): we should clean this hashmap. status_of_keys: RwLock, } @@ -153,6 +148,7 @@ impl> notify = status_of_keys.remove(key).unwrap().1; } notify.inner.notify_waiters(); + debug!("Notify waiters error for key {}", key); } async fn notify_waiters_mem(&self, key: &String, bytes_mem_written: usize) { @@ -160,20 +156,28 @@ impl> { let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); + *status = Status::MemCompleted; + debug!( + "Notify waiters disk for key {}: set status to MemCompleted", + key, + ); *size = bytes_mem_written; - { - let notify_waiter = notify_ref.waiter_count.lock().await; - if *notify_waiter > 0 { - self.mem_replacer - .as_ref() - .unwrap() - .lock() - .await - .pin(key, *notify_waiter); - } + let notify_waiter = notify_ref.waiter_count.lock().await; + if *notify_waiter > 0 { + self.mem_replacer + .as_ref() + .unwrap() + .lock() + .await + .pin(key, *notify_waiter); + debug!( + "Notify waiters mem for key {}: pin with waiter count {}", + key, *notify_waiter + ); + } else { + debug!("Notify waiters mem for key {}: no waiter", key); } notify = notify_ref.clone(); - *status = Status::MemCompleted; } // FIXME: status_of_keys write lock is released here, so the waken thread can immediately grab the lock notify.inner.notify_waiters(); @@ -185,10 +189,20 @@ impl> let mut status_of_keys = self.status_of_keys.write().await; let ((status, size), notify_ref) = status_of_keys.get_mut(key).unwrap(); *status = Status::DiskCompleted; + debug!( + "Notify waiters disk for key {}: set status to DiskCompleted", + key, + ); *size = bytes_disk_written; let notify_waiter = notify_ref.waiter_count.lock().await; if *notify_waiter > 0 { self.disk_replacer.lock().await.pin(key, *notify_waiter); + debug!( + "Notify waiters disk for key {}: pin with waiter count {}", + key, *notify_waiter + ); + } else { + debug!("Notify waiters disk for key {}: no waiter", key); } notify = notify_ref.clone(); } @@ -220,6 +234,10 @@ impl> D .read_data(&data_store_key, self.mem_replacer.as_ref().unwrap().clone()) { Ok(Some(rx)) => { + debug!( + "MemDiskStore get_data_from_cache: directly read data for {} in memory", + remote_location + ); return Ok(Some(rx)); } Ok(None) => { @@ -251,6 +269,10 @@ impl> D ) .await? { + debug!( + "MemDiskStore get_data_from_cache: directly read data for {} from disk", + remote_location + ); Ok(Some(data)) } else { return Err(ParpulseError::Internal( @@ -273,6 +295,10 @@ impl> D match status { Status::Incompleted => { + debug!( + "MemDiskStore put_data_to_cache: find incompleted status for {}", + remote_location + ); notify = notify_ref.clone(); *notify_ref.waiter_count.lock().await += 1; // The Notified future is guaranteed to receive wakeups from notify_waiters() @@ -298,8 +324,11 @@ impl> D // and not be evicted. It is not wait and notified situation. let mut mem_replacer = self.mem_replacer.as_ref().unwrap().lock().await; if mem_replacer.peek(&remote_location).is_some() { + debug!("MemDiskStore put_data_to_cache: find mem-completed status for {}, remote location already in mem replacer, pin + 1", remote_location); mem_replacer.pin(&remote_location, 1); return Ok(*size); + } else { + debug!("MemDiskStore put_data_to_cache:find mem-completed status for {}, no remote location in mem replacer", remote_location); } // If mem_replacer has no data, then update status_of_keys } @@ -308,8 +337,11 @@ impl> D // and not be evicted. It is not wait and notified situation. let mut disk_replacer = self.disk_replacer.lock().await; if disk_replacer.peek(&remote_location).is_some() { + debug!("MemDiskStore put_data_to_cache: find disk-completed status for {}, remote location already in disk replacer, pin + 1", remote_location); disk_replacer.pin(&remote_location, 1); return Ok(*size); + } else { + debug!("MemDiskStore put_data_to_cache: find disk-completed status for {}, no remote location in disk replacer", remote_location); } // If disk_replacer has no data, then update status_of_keys } @@ -324,6 +356,10 @@ impl> D Arc::new(MemDiskStoreNotify::new()), ), ); + debug!( + "MemDiskStore put_data_to_cache: put incompleted status for {} into status_of_keys", + remote_location + ); // TODO: Refine the lock. // TODO(lanlou): Also write the data to network. @@ -345,6 +381,10 @@ impl> D { // If write_data returns something, it means the file size is too large // to fit in the memory. We should put it to disk cache. + debug!( + "MemDiskStore put_data_to_cache: data size for {} not fit in memory, transfer data to disk", + remote_location + ); bytes_to_disk = Some(bytes_vec); break; } @@ -367,9 +407,8 @@ impl> D remote_location.clone(), MemDiskStoreReplacerValue::new(remote_location.clone(), bytes_mem_written), ); - // Insertion fails. if replacer_put_status.is_none() { - // Put the data to disk cache. + // If inserting to memory fails, put the data to disk cache. bytes_to_disk = Some( mem_store .write() @@ -381,6 +420,10 @@ impl> D } else { // TODO(lanlou): currently the pin/unpin relies on after putting, it will **always** get! mem_replacer.pin(&remote_location, 1); + debug!( + "MemDiskStore put_data_to_cache: mem pin for {} + 1", + remote_location + ); // If successfully putting it into mem_replacer, we should record the evicted data, // delete them from mem_store, and put all of them to disk cache. if let Some(evicted_keys) = replacer_put_status { @@ -398,10 +441,17 @@ impl> D // If the key is still in the status_of_keys, it means the data is still in progress. // We should not put the data to disk cache. if *status == Status::Incompleted { + debug!( + "MemDiskStore put_data_to_cache: key {} still in progress, skip and not put to disk", + evicted_key + ); continue; } assert!(*status == Status::MemCompleted); *status = Status::DiskCompleted; + debug!( + "MemDiskStore put_data_to_cache: set status for key {} from MemCompleted to DiskCompleted", + evicted_key); } let (bytes_vec, data_size) = @@ -427,7 +477,13 @@ impl> D for evicted_key in evicted_keys { // FIXME: I think all status of the evicted_key removed here should be // `Completed`? - status_of_keys.remove(&evicted_key); + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 1 remove status {:?} for key {}", + status,evicted_key + ); + } self.disk_store .clean_data( &self.disk_store.data_store_key(&evicted_key), @@ -447,7 +503,13 @@ impl> D // It is not main in-progress thread for evicted_key, and its status should be // `Completed`, so there is totally no need to notify the waiters. // FIXME: can we safely remove evicted_key here? - status_of_keys.remove(&evicted_key); + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 2 remove status {:?} for key {}", + status,evicted_key + ); + } } } } @@ -484,7 +546,13 @@ impl> D Some(evicted_keys) => { let mut status_of_keys = self.status_of_keys.write().await; for evicted_key in evicted_keys { - status_of_keys.remove(&evicted_key); + let removed_status = status_of_keys.remove(&evicted_key); + if let Some(((status, _), _)) = removed_status { + debug!( + "MemDiskStore put_data_to_cache: 3 remove status {:?} for key {}", + status, evicted_key + ); + } self.disk_store .clean_data(&self.disk_store.data_store_key(&evicted_key)) .await?; @@ -505,6 +573,10 @@ impl> D } } disk_replacer.pin(&remote_location, 1); + debug!( + "MemDiskStore put_data_to_cache: disk pin for {} + 1", + remote_location + ); } self.notify_waiters_disk(&remote_location, data_size).await; Ok(data_size)