Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pv naming #22

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions benches/nkv_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ fn bench_nkv(c: &mut Criterion) {
group.bench_function(format!("nkv_new"), |b| {
b.to_async(&rt).iter(|| async {
let temp_dir = TempDir::new().unwrap();
let result = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let result = NkvCore::new(storage).unwrap();
black_box(result)
})
});
Expand All @@ -36,8 +37,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|input| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
let result = nkv.put("key1", input).await;
black_box(result)
},
Expand All @@ -55,8 +56,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|(data, new_data)| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
nkv.put("key1", data).await;
let result = nkv.put("key1", new_data).await;
black_box(result)
Expand All @@ -70,8 +71,8 @@ fn bench_nkv(c: &mut Criterion) {
|| {
let data = vec![0u8; size].into_boxed_slice();
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(nkv.put("key1", data));
nkv
Expand All @@ -89,8 +90,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|data| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();

nkv.put("key1", data).await;
let result = nkv.delete("key1").await;
Expand Down
152 changes: 50 additions & 102 deletions src/nkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::path::PathBuf;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::{fs, future::Future, io};

use crate::errors::NotifyKeyValueError;
use crate::traits::{StorageEngine, Value};
use crate::traits::StorageEngine;
use crate::trie::{Trie, TrieNode};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -131,74 +130,39 @@ impl Notification {
}
}

type N = Arc<Mutex<Notification>>;

pub struct NkvCore<P: StorageEngine> {
state: Trie<Value<P>>,
persist_path: PathBuf,
notifiers: Trie<N>,
storage: P,
}

impl<P: StorageEngine> NkvCore<P> {
pub fn new(path: std::path::PathBuf) -> std::io::Result<Self> {
let mut res = Self {
state: Trie::new(),
persist_path: path,
pub fn new(storage: P) -> std::io::Result<Self> {
let res = Self {
notifiers: Trie::new(),
storage,
};

if !res.persist_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a file, not a directory", res.persist_path),
));
}

for entry in fs::read_dir(&res.persist_path)? {
let entry = entry?;
let path = entry.path();

if path.is_file() {
match path.file_name() {
Some(fp) => {
let key = fp.to_str().unwrap();
let value = Value {
pv: P::from_checkpoint(&path)?,
notifier: Arc::new(Mutex::new(Notification::new())),
};

res.state.insert(key, value);
}
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a directory, not a file", path),
))
}
}
} else if path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a directory, not a file", path),
));
}
}

Ok(res)
}

pub async fn put(&mut self, key: &str, value: Box<[u8]>) -> Result<(), NotifyKeyValueError> {
let vector: Arc<Mutex<Vec<Arc<Mutex<Notification>>>>> = Arc::new(Mutex::new(Vec::new()));
let vector: Arc<Mutex<Vec<N>>> = Arc::new(Mutex::new(Vec::new()));
let vc = Arc::clone(&vector);

let capture_and_push: Option<
Box<
dyn Fn(&mut TrieNode<Value<P>>) -> Pin<Box<dyn Future<Output = ()> + Send>>
dyn Fn(&mut TrieNode<N>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
>,
> = Some({
Box::new(
move |trie_ref: &mut TrieNode<Value<P>>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
move |trie_ref: &mut TrieNode<N>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let notifier_arc = match trie_ref.value.as_ref() {
Some(value) => Arc::clone(&value.notifier),
Some(value) => Arc::clone(&value),
None => return Box::pin(async {}),
};

Expand All @@ -212,23 +176,13 @@ impl<P: StorageEngine> NkvCore<P> {
)
});

if let Some(val) = self.state.get_mut(key, capture_and_push).await {
// TODO: Maybe we can use reference?
// so that we don't have to clone
let _ = val.pv.update(value.clone());
// let _ = val.pv.update(value.clone());
let _ = val
.notifier
self.storage.put(key, value.clone())?;

if let Some(notifier) = self.notifiers.get_mut(key, capture_and_push).await {
let _ = notifier
.lock()
.await
.send_update(key.to_string(), value.clone());
} else {
let val = Value {
// TODO: REMOVE UNWRAP AND ADD PROPER HANDLING
pv: P::new(value.clone(), self.persist_path.join(key)).unwrap(),
notifier: Arc::new(Mutex::new(Notification::new())),
};
self.state.insert(key, val);
}

let vector_lock = vector.lock().await;
Expand All @@ -243,57 +197,49 @@ impl<P: StorageEngine> NkvCore<P> {
}

pub fn get(&self, key: &str) -> Vec<Arc<[u8]>> {
self.state
.get(key)
.iter()
.map(|s| Arc::clone(&s.pv.data()))
.collect()
self.storage.get(key)
}

pub async fn delete(&mut self, key: &str) -> Result<(), NotifyKeyValueError> {
let vector: Arc<Mutex<Vec<Arc<Mutex<Notification>>>>> = Arc::new(Mutex::new(Vec::new()));
let vector: Arc<Mutex<Vec<N>>> = Arc::new(Mutex::new(Vec::new()));
let vc = Arc::clone(&vector);

let capture_and_push: Option<
Box<
dyn Fn(&mut TrieNode<Value<P>>) -> Pin<Box<dyn Future<Output = ()> + Send>>
dyn Fn(&mut TrieNode<N>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
>,
> = Some({
Box::new(
move |trie_ref: &mut TrieNode<Value<P>>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
move |trie_ref: &mut TrieNode<N>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let notifier_arc = match trie_ref.value.as_ref() {
Some(value) => Arc::clone(&value.notifier),
Some(value) => Arc::clone(&value),
None => return Box::pin(async {}),
};

let vector_clone = Arc::clone(&vc);

Box::pin(async move {
{
let _notifier = notifier_arc.lock().await;
}

let mut vector_lock = vector_clone.lock().await;
vector_lock.push(notifier_arc);
})
},
)
});

if let Some(val) = self.state.get_mut(key, capture_and_push).await {
if let Some(val) = self.notifiers.get_mut(key, capture_and_push).await {
{
let vec_lock = vector.lock().await;
for n_arc in vec_lock.iter() {
n_arc.lock().await.send_close(key.to_string())?;
}
}
val.notifier.lock().await.unsubscribe_all(key)?;
val.pv.delete_checkpoint()?;
val.lock().await.unsubscribe_all(key)?;
}
self.state.remove(key);
self.storage.delete(key)?;
self.notifiers.remove(key);
Ok(())
}

Expand All @@ -302,16 +248,13 @@ impl<P: StorageEngine> NkvCore<P> {
key: &str,
uuid: String,
) -> Result<mpsc::UnboundedReceiver<Message>, NotifyKeyValueError> {
if let Some(val) = self.state.get_mut(key, None).await {
return Ok(val.notifier.lock().await.subscribe(uuid)?);
if let Some(val) = self.notifiers.get_mut(key, None).await {
return Ok(val.lock().await.subscribe(uuid)?);
} else {
// Client can subscribe to a non-existent value
let val = Value {
pv: P::new(Box::new([]), self.persist_path.join(key))?,
notifier: Arc::new(Mutex::new(Notification::new())),
};
let tx = val.notifier.lock().await.subscribe(uuid)?;
self.state.insert(key, val);
let n = Arc::new(Mutex::new(Notification::new()));
let tx = n.lock().await.subscribe(uuid)?;
self.notifiers.insert(key, n);
return Ok(tx);
}
}
Expand All @@ -321,11 +264,8 @@ impl<P: StorageEngine> NkvCore<P> {
key: &str,
uuid: String,
) -> Result<(), NotifyKeyValueError> {
if let Some(val) = self.state.get_mut(key, None).await {
val.notifier
.lock()
.await
.unsubscribe(key.to_string(), &uuid)?;
if let Some(val) = self.notifiers.get_mut(key, None).await {
val.lock().await.unsubscribe(key.to_string(), &uuid)?;
Ok(())
} else {
Err(NotifyKeyValueError::NotFound)
Expand All @@ -345,7 +285,8 @@ mod tests {
#[tokio::test]
async fn test_put_and_get() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -359,7 +300,8 @@ mod tests {
#[tokio::test]
async fn test_get_nonexistent_key() -> Result<()> {
let temp_dir = TempDir::new()?;
let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let nkv = NkvCore::new(storage)?;

let result = nkv.get("nonexistent_key");
assert_eq!(result, Vec::new());
Expand All @@ -370,7 +312,8 @@ mod tests {
#[tokio::test]
async fn test_delete() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -385,7 +328,8 @@ mod tests {
#[tokio::test]
async fn test_update_value() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand All @@ -408,13 +352,15 @@ mod tests {
let data3: Box<[u8]> = Box::new([10, 11, 12, 13, 14]);

{
let mut nkv = NkvCore::<FileStorage>::new(path.clone())?;
let storage = FileStorage::new(path.clone())?;
let mut nkv = NkvCore::new(storage)?;
nkv.put("key1", data1.clone()).await?;
nkv.put("key2", data2.clone()).await?;
nkv.put("key3", data3.clone()).await?;
}

let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(path)?;
let nkv = NkvCore::new(storage)?;
let result = nkv.get("key1");
assert_eq!(result, vec!(Arc::from(data1)));

Expand All @@ -430,7 +376,8 @@ mod tests {
#[tokio::test]
async fn test_subsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down Expand Up @@ -466,7 +413,8 @@ mod tests {
#[tokio::test]
async fn test_unsubsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down
Loading
Loading