Skip to content

Commit

Permalink
FileStorage: make keyspaces as folders and save files as json
Browse files Browse the repository at this point in the history
This is to be compatible with EVE-OS PubSub

Signed-off-by: Pavel Abramov <[email protected]>
  • Loading branch information
uncleDecart committed Oct 17, 2024
1 parent e729905 commit 5571c7d
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 210 deletions.
19 changes: 10 additions & 9 deletions benches/nkv_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ fn bench_nkv(c: &mut Criterion) {
group.bench_function(format!("nkv_new"), |b| {
b.to_async(&rt).iter(|| async {
let temp_dir = TempDir::new().unwrap();
let result = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let result = NkvCore::new(storage).unwrap();
black_box(result)
})
});
Expand All @@ -36,8 +37,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|input| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
let result = nkv.put("key1", input).await;
black_box(result)
},
Expand All @@ -55,8 +56,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|(data, new_data)| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
nkv.put("key1", data).await;
let result = nkv.put("key1", new_data).await;
black_box(result)
Expand All @@ -70,8 +71,8 @@ fn bench_nkv(c: &mut Criterion) {
|| {
let data = vec![0u8; size].into_boxed_slice();
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(nkv.put("key1", data));
nkv
Expand All @@ -89,8 +90,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|data| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let storage = FileStorage::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::new(storage).unwrap();

nkv.put("key1", data).await;
let result = nkv.delete("key1").await;
Expand Down
152 changes: 50 additions & 102 deletions src/nkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::path::PathBuf;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::{fs, future::Future, io};

use crate::errors::NotifyKeyValueError;
use crate::traits::{StorageEngine, Value};
use crate::traits::StorageEngine;
use crate::trie::{Trie, TrieNode};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -131,74 +130,39 @@ impl Notification {
}
}

type N = Arc<Mutex<Notification>>;

pub struct NkvCore<P: StorageEngine> {
state: Trie<Value<P>>,
persist_path: PathBuf,
notifiers: Trie<N>,
storage: P,
}

impl<P: StorageEngine> NkvCore<P> {
pub fn new(path: std::path::PathBuf) -> std::io::Result<Self> {
let mut res = Self {
state: Trie::new(),
persist_path: path,
pub fn new(storage: P) -> std::io::Result<Self> {
let res = Self {
notifiers: Trie::new(),
storage,
};

if !res.persist_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a file, not a directory", res.persist_path),
));
}

for entry in fs::read_dir(&res.persist_path)? {
let entry = entry?;
let path = entry.path();

if path.is_file() {
match path.file_name() {
Some(fp) => {
let key = fp.to_str().unwrap();
let value = Value {
pv: P::from_checkpoint(&path)?,
notifier: Arc::new(Mutex::new(Notification::new())),
};

res.state.insert(key, value);
}
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a directory, not a file", path),
))
}
}
} else if path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a directory, not a file", path),
));
}
}

Ok(res)
}

pub async fn put(&mut self, key: &str, value: Box<[u8]>) -> Result<(), NotifyKeyValueError> {
let vector: Arc<Mutex<Vec<Arc<Mutex<Notification>>>>> = Arc::new(Mutex::new(Vec::new()));
let vector: Arc<Mutex<Vec<N>>> = Arc::new(Mutex::new(Vec::new()));
let vc = Arc::clone(&vector);

let capture_and_push: Option<
Box<
dyn Fn(&mut TrieNode<Value<P>>) -> Pin<Box<dyn Future<Output = ()> + Send>>
dyn Fn(&mut TrieNode<N>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
>,
> = Some({
Box::new(
move |trie_ref: &mut TrieNode<Value<P>>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
move |trie_ref: &mut TrieNode<N>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let notifier_arc = match trie_ref.value.as_ref() {
Some(value) => Arc::clone(&value.notifier),
Some(value) => Arc::clone(&value),
None => return Box::pin(async {}),
};

Expand All @@ -212,23 +176,13 @@ impl<P: StorageEngine> NkvCore<P> {
)
});

if let Some(val) = self.state.get_mut(key, capture_and_push).await {
// TODO: Maybe we can use reference?
// so that we don't have to clone
let _ = val.pv.update(value.clone());
// let _ = val.pv.update(value.clone());
let _ = val
.notifier
self.storage.put(key, value.clone())?;

if let Some(notifier) = self.notifiers.get_mut(key, capture_and_push).await {
let _ = notifier
.lock()
.await
.send_update(key.to_string(), value.clone());
} else {
let val = Value {
// TODO: REMOVE UNWRAP AND ADD PROPER HANDLING
pv: P::new(value.clone(), self.persist_path.join(key)).unwrap(),
notifier: Arc::new(Mutex::new(Notification::new())),
};
self.state.insert(key, val);
}

let vector_lock = vector.lock().await;
Expand All @@ -243,57 +197,49 @@ impl<P: StorageEngine> NkvCore<P> {
}

pub fn get(&self, key: &str) -> Vec<Arc<[u8]>> {
self.state
.get(key)
.iter()
.map(|s| Arc::clone(&s.pv.data()))
.collect()
self.storage.get(key)
}

pub async fn delete(&mut self, key: &str) -> Result<(), NotifyKeyValueError> {
let vector: Arc<Mutex<Vec<Arc<Mutex<Notification>>>>> = Arc::new(Mutex::new(Vec::new()));
let vector: Arc<Mutex<Vec<N>>> = Arc::new(Mutex::new(Vec::new()));
let vc = Arc::clone(&vector);

let capture_and_push: Option<
Box<
dyn Fn(&mut TrieNode<Value<P>>) -> Pin<Box<dyn Future<Output = ()> + Send>>
dyn Fn(&mut TrieNode<N>) -> Pin<Box<dyn Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
>,
> = Some({
Box::new(
move |trie_ref: &mut TrieNode<Value<P>>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
move |trie_ref: &mut TrieNode<N>| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let notifier_arc = match trie_ref.value.as_ref() {
Some(value) => Arc::clone(&value.notifier),
Some(value) => Arc::clone(&value),
None => return Box::pin(async {}),
};

let vector_clone = Arc::clone(&vc);

Box::pin(async move {
{
let _notifier = notifier_arc.lock().await;
}

let mut vector_lock = vector_clone.lock().await;
vector_lock.push(notifier_arc);
})
},
)
});

if let Some(val) = self.state.get_mut(key, capture_and_push).await {
if let Some(val) = self.notifiers.get_mut(key, capture_and_push).await {
{
let vec_lock = vector.lock().await;
for n_arc in vec_lock.iter() {
n_arc.lock().await.send_close(key.to_string())?;
}
}
val.notifier.lock().await.unsubscribe_all(key)?;
val.pv.delete_checkpoint()?;
val.lock().await.unsubscribe_all(key)?;
}
self.state.remove(key);
self.storage.delete(key)?;
self.notifiers.remove(key);
Ok(())
}

Expand All @@ -302,16 +248,13 @@ impl<P: StorageEngine> NkvCore<P> {
key: &str,
uuid: String,
) -> Result<mpsc::UnboundedReceiver<Message>, NotifyKeyValueError> {
if let Some(val) = self.state.get_mut(key, None).await {
return Ok(val.notifier.lock().await.subscribe(uuid)?);
if let Some(val) = self.notifiers.get_mut(key, None).await {
return Ok(val.lock().await.subscribe(uuid)?);
} else {
// Client can subscribe to a non-existent value
let val = Value {
pv: P::new(Box::new([]), self.persist_path.join(key))?,
notifier: Arc::new(Mutex::new(Notification::new())),
};
let tx = val.notifier.lock().await.subscribe(uuid)?;
self.state.insert(key, val);
let n = Arc::new(Mutex::new(Notification::new()));
let tx = n.lock().await.subscribe(uuid)?;
self.notifiers.insert(key, n);
return Ok(tx);
}
}
Expand All @@ -321,11 +264,8 @@ impl<P: StorageEngine> NkvCore<P> {
key: &str,
uuid: String,
) -> Result<(), NotifyKeyValueError> {
if let Some(val) = self.state.get_mut(key, None).await {
val.notifier
.lock()
.await
.unsubscribe(key.to_string(), &uuid)?;
if let Some(val) = self.notifiers.get_mut(key, None).await {
val.lock().await.unsubscribe(key.to_string(), &uuid)?;
Ok(())
} else {
Err(NotifyKeyValueError::NotFound)
Expand All @@ -345,7 +285,8 @@ mod tests {
#[tokio::test]
async fn test_put_and_get() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -359,7 +300,8 @@ mod tests {
#[tokio::test]
async fn test_get_nonexistent_key() -> Result<()> {
let temp_dir = TempDir::new()?;
let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let nkv = NkvCore::new(storage)?;

let result = nkv.get("nonexistent_key");
assert_eq!(result, Vec::new());
Expand All @@ -370,7 +312,8 @@ mod tests {
#[tokio::test]
async fn test_delete() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -385,7 +328,8 @@ mod tests {
#[tokio::test]
async fn test_update_value() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand All @@ -408,13 +352,15 @@ mod tests {
let data3: Box<[u8]> = Box::new([10, 11, 12, 13, 14]);

{
let mut nkv = NkvCore::<FileStorage>::new(path.clone())?;
let storage = FileStorage::new(path.clone())?;
let mut nkv = NkvCore::new(storage)?;
nkv.put("key1", data1.clone()).await?;
nkv.put("key2", data2.clone()).await?;
nkv.put("key3", data3.clone()).await?;
}

let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(path)?;
let nkv = NkvCore::new(storage)?;
let result = nkv.get("key1");
assert_eq!(result, vec!(Arc::from(data1)));

Expand All @@ -430,7 +376,8 @@ mod tests {
#[tokio::test]
async fn test_subsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down Expand Up @@ -466,7 +413,8 @@ mod tests {
#[tokio::test]
async fn test_unsubsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let storage = FileStorage::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::new(storage)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down
Loading

0 comments on commit 5571c7d

Please sign in to comment.