Skip to content

Commit

Permalink
FileStorage: make keyspaces as folders and save files as json
Browse files Browse the repository at this point in the history
This is to be compatible with EVE-OS PubSub

Signed-off-by: Pavel Abramov <[email protected]>
  • Loading branch information
uncleDecart committed Oct 15, 2024
1 parent e729905 commit 406aba5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 28 deletions.
15 changes: 5 additions & 10 deletions src/nkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,13 @@ impl<P: StorageEngine> NkvCore<P> {

if path.is_file() {
match path.file_name() {
Some(fp) => {
let key = fp.to_str().unwrap();
Some(_) => {
let value = Value {
pv: P::from_checkpoint(&path)?,
pv: P::from_checkpoint(&path, res.persist_path.clone())?,
notifier: Arc::new(Mutex::new(Notification::new())),
};

res.state.insert(key, value);
res.state.insert(&value.pv.key(), value);
}
None => {
return Err(io::Error::new(
Expand Down Expand Up @@ -225,7 +224,7 @@ impl<P: StorageEngine> NkvCore<P> {
} else {
let val = Value {
// TODO: REMOVE UNWRAP AND ADD PROPER HANDLING
pv: P::new(value.clone(), self.persist_path.join(key)).unwrap(),
pv: P::new(value.clone(), key, self.persist_path.clone()).unwrap(),
notifier: Arc::new(Mutex::new(Notification::new())),
};
self.state.insert(key, val);
Expand Down Expand Up @@ -272,10 +271,6 @@ impl<P: StorageEngine> NkvCore<P> {
let vector_clone = Arc::clone(&vc);

Box::pin(async move {
{
let _notifier = notifier_arc.lock().await;
}

let mut vector_lock = vector_clone.lock().await;
vector_lock.push(notifier_arc);
})
Expand Down Expand Up @@ -307,7 +302,7 @@ impl<P: StorageEngine> NkvCore<P> {
} else {
// Client can subscribe to a non-existent value
let val = Value {
pv: P::new(Box::new([]), self.persist_path.join(key))?,
pv: P::new(Box::new([]), key, self.persist_path.clone())?,
notifier: Arc::new(Mutex::new(Notification::new())),
};
let tx = val.notifier.lock().await.subscribe(uuid)?;
Expand Down
64 changes: 49 additions & 15 deletions src/persist_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ use tempfile::NamedTempFile;
pub struct FileStorage {
data: Arc<[u8]>,
fp: PathBuf,
root: PathBuf,
}

impl FileStorage {
fn key_to_path(key: &str) -> String {
key.replace('.', "/") + ".json"
}
}

impl StorageEngine for FileStorage {
fn new(new_data: Box<[u8]>, filepath: PathBuf) -> std::io::Result<Self> {
fn new(new_data: Box<[u8]>, key: &str, root: PathBuf) -> std::io::Result<Self> {
let filepath = root.join(Self::key_to_path(key));
// Create parent folders if they don't exist
if let Some(parent_dir) = filepath.parent() {
if !parent_dir.exists() {
Expand All @@ -31,14 +39,16 @@ impl StorageEngine for FileStorage {
Ok(Self {
data: Arc::from(new_data),
fp: filepath,
root,
})
}
fn from_checkpoint(filepath: &Path) -> std::io::Result<Self> {
fn from_checkpoint(filepath: &Path, root: PathBuf) -> std::io::Result<Self> {
let data = fs::read(filepath)?;

Ok(Self {
data: data.into_boxed_slice().into(),
fp: filepath.to_path_buf(),
root,
})
}

Expand All @@ -56,6 +66,23 @@ impl StorageEngine for FileStorage {
fn data(&self) -> Arc<[u8]> {
Arc::clone(&self.data)
}

fn key(&self) -> String {
if let Ok(stripped) = self.fp.strip_prefix(&self.root) {
let input_str = stripped.to_string_lossy();

// Check if the string ends with ".json"
if input_str.ends_with(".json") {
// Remove the ".json" and replace slashes with dots
let without_json = input_str.trim_end_matches(".json");
without_json.replace('/', ".")
} else {
"".to_string()
}
} else {
"".to_string() // If the prefix doesn't match, return None
}
}
}

impl PartialEq for FileStorage {
Expand All @@ -80,33 +107,38 @@ mod tests {

#[test]
fn test_persist_value() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let file_path = temp_dir.path().join("testfile.dat");
let dir = TempDir::new()?;
let path = dir.path();
let key = "testfile".to_string();
let file_path = path.join(FileStorage::key_to_path(&key));

let original_data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);

let persist_value = FileStorage::new(original_data.clone(), file_path.clone())?;
let persist_value = FileStorage::new(original_data.clone(), &key, path.to_path_buf())?;

let data_from_file = fs::read(file_path.clone())?;
assert_eq!(data_from_file, original_data.as_ref());

let loaded_persist_value = FileStorage::from_checkpoint(file_path.as_path())?;
let loaded_persist_value =
FileStorage::from_checkpoint(file_path.as_path(), path.to_path_buf())?;

assert_eq!(persist_value, loaded_persist_value);

temp_dir.close()?;
dir.close()?;

Ok(())
}

#[test]
fn test_update() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let file_path = temp_dir.path().join("testfile.dat");
let dir = TempDir::new()?;
let path = dir.path();
let key = "testfile.dat".to_string();
let file_path = path.join(FileStorage::key_to_path(&key));

let initial_data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);

let mut persist_value = FileStorage::new(initial_data.clone(), file_path.clone())?;
let mut persist_value = FileStorage::new(initial_data.clone(), &key, path.to_path_buf())?;

let new_data: Box<[u8]> = Box::new([6, 7, 8, 9, 10]);

Expand All @@ -117,27 +149,29 @@ mod tests {

assert_eq!(persist_value.data(), new_data.as_ref().into());

temp_dir.close()?;
dir.close()?;

Ok(())
}

#[test]
fn test_delete_checkpoint() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let file_path = temp_dir.path().join("testfile.dat");
let dir = TempDir::new()?;
let path = dir.path();
let key = "testfile.dat".to_string();
let file_path = path.join(FileStorage::key_to_path(&key));

let initial_data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);

let persist_value = FileStorage::new(initial_data.clone(), file_path.clone())?;
let persist_value = FileStorage::new(initial_data.clone(), &key, path.to_path_buf())?;

assert!(file_path.exists());

persist_value.delete_checkpoint()?;

assert!(!file_path.exists());

temp_dir.close()?;
dir.close()?;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// SPDX-License-Identifier: Apache-2.0

use crate::nkv::Notification;
// use std::fmt::{self, Debug, Error};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;

pub trait StorageEngine: PartialEq {
fn new(data: Box<[u8]>, filepath: PathBuf) -> std::io::Result<Self>
fn new(data: Box<[u8]>, key: &str, root: PathBuf) -> std::io::Result<Self>
where
Self: Sized;

fn from_checkpoint(filepath: &Path) -> std::io::Result<Self>
fn from_checkpoint(filepath: &Path, root: PathBuf) -> std::io::Result<Self>
where
Self: Sized;
fn update(&mut self, new_data: Box<[u8]>) -> std::io::Result<()>;
fn delete_checkpoint(&self) -> std::io::Result<()>;
fn data(&self) -> Arc<[u8]>;
fn key(&self) -> String;
}

#[derive(Debug)]
Expand Down

0 comments on commit 406aba5

Please sign in to comment.