Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
lanlou1554 committed Apr 28, 2024
1 parent 86e23e3 commit 9c19d66
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
8 changes: 6 additions & 2 deletions storage-node/src/cache/data_store_cache/memdisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D
let data_store_key = replacer_value.as_value();
match mem_store.read().await.read_data(data_store_key) {
Ok(Some(rx)) => {
// TODO(lanlou): actually we should unpin after all the data are consumed...
if !mem_replacer.unpin(&remote_location) {
warn!(
"Failed to unpin the key ({}) in memory replacer.",
Expand Down Expand Up @@ -388,8 +389,11 @@ impl<R: DataStoreReplacer<MemDiskStoreReplacerKey, MemDiskStoreReplacerValue>> D
let ((status, size), notify) = status_of_keys.get_mut(&remote_location).unwrap();
*status = Status::Completed;
*size = data_size;
for _ in 0..*notify.1.lock().await {
self.disk_replacer.lock().await.pin(&remote_location);
{
let mut disk_replacer = self.disk_replacer.lock().await;
for _ in 0..*notify.1.lock().await {
disk_replacer.pin(&remote_location);
}
}
notify.0.notify_waiters();
Ok(data_size)
Expand Down
4 changes: 0 additions & 4 deletions storage-node/src/cache/replacer/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ impl<K: ReplacerKey, V: ReplacerValue> LruReplacer<K, V> {
while (self.size + value.size()) > self.max_capacity {
// Previous code guarantees that there is at least 1 element in the cache_map, so we
// can safely unwrap here.
println!(
"-------- To Evicting Key: {:?} --------",
self.cache_map.front().unwrap().1 .1
);
if self.cache_map.front().unwrap().1 .1 > 0 {
// TODO(lanlou): Actually we should look next to evict, but for simplicity, just
// return None here, it is temporarily okay since we will not pin an element for
Expand Down
5 changes: 2 additions & 3 deletions storage-node/src/cache/replacer/lru_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ impl<K: ReplacerKey, V: ReplacerValue> LruKReplacer<K, V> {
let mut evicted_keys = Vec::new();
while (self.size + updated_size) > self.max_capacity {
let key_to_evict = self.evict(&key);
if key_to_evict.is_none() {
return None;
}
// If key_to_evict is none, return none
key_to_evict.as_ref()?;
if let Some(evicted_key) = key_to_evict {
evicted_keys.push(evicted_key);
}
Expand Down
1 change: 1 addition & 0 deletions storage-node/src/disk/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl DiskManager {
if let Some(mut stream) = stream {
let bytes_cur = stream.next().await;
if bytes_cur.is_none() {
file.flush().await?;
return Ok(bytes_written);
}
let mut bytes_cur = bytes_cur.unwrap()?;
Expand Down

0 comments on commit 9c19d66

Please sign in to comment.