Skip to content

Commit

Permalink
Add Converter trait and JsonConverter trait
Browse files Browse the repository at this point in the history
In order to support backward compatibility with existing systems
if you are migrating to nkv or want to fit into a specific file
structure Converter trait was added into NkvCore

Signed-off-by: Pavel Abramov <[email protected]>
  • Loading branch information
uncleDecart committed Oct 14, 2024
1 parent e729905 commit 5a9e6c2
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 44 deletions.
21 changes: 11 additions & 10 deletions benches/nkv_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use nkv::nkv::NkvCore;
use nkv::persist_value::FileStorage;
use nkv::persist_value::{FileStorage, JsonConverter};
use nkv::srv::{BaseMsg, GetMsg, PutMsg, Server};
use nkv::trie::Trie;
use tempfile::TempDir;
Expand All @@ -21,7 +21,8 @@ fn bench_nkv(c: &mut Criterion) {
group.bench_function(format!("nkv_new"), |b| {
b.to_async(&rt).iter(|| async {
let temp_dir = TempDir::new().unwrap();
let result = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap();
let result = NkvCore::<FileStorage, JsonConverter>::new(c).unwrap();
black_box(result)
})
});
Expand All @@ -36,8 +37,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|input| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c).unwrap();
let result = nkv.put("key1", input).await;
black_box(result)
},
Expand All @@ -55,8 +56,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|(data, new_data)| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c).unwrap();
nkv.put("key1", data).await;
let result = nkv.put("key1", new_data).await;
black_box(result)
Expand All @@ -70,8 +71,8 @@ fn bench_nkv(c: &mut Criterion) {
|| {
let data = vec![0u8; size].into_boxed_slice();
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c).unwrap();
let rt = Runtime::new().unwrap();
rt.block_on(nkv.put("key1", data));
nkv
Expand All @@ -89,8 +90,8 @@ fn bench_nkv(c: &mut Criterion) {
},
|data| async {
let temp_dir = TempDir::new().unwrap();
let mut nkv =
NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf()).unwrap();
let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap();
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c).unwrap();

nkv.put("key1", data).await;
let result = nkv.delete("key1").await;
Expand Down
61 changes: 32 additions & 29 deletions src/nkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::{fs, future::Future, io};

use crate::errors::NotifyKeyValueError;
use crate::traits::{StorageEngine, Value};
use crate::traits::{Converter, StorageEngine, Value};
use crate::trie::{Trie, TrieNode};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -131,39 +130,33 @@ impl Notification {
}
}

pub struct NkvCore<P: StorageEngine> {
pub struct NkvCore<P: StorageEngine, C: Converter> {
state: Trie<Value<P>>,
persist_path: PathBuf,
converter: C,
}

impl<P: StorageEngine> NkvCore<P> {
pub fn new(path: std::path::PathBuf) -> std::io::Result<Self> {
impl<P: StorageEngine, C: Converter> NkvCore<P, C> {
pub fn new(converter: C) -> std::io::Result<Self> {
let mut res = Self {
state: Trie::new(),
persist_path: path,
converter,
};

if !res.persist_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a file, not a directory", res.persist_path),
));
}

for entry in fs::read_dir(&res.persist_path)? {
for entry in fs::read_dir(&res.converter.root())? {
let entry = entry?;
let path = entry.path();

if path.is_file() {
match path.file_name() {
Some(fp) => {
let key = fp.to_str().unwrap();
Some(_) => {
let value = Value {
pv: P::from_checkpoint(&path)?,
notifier: Arc::new(Mutex::new(Notification::new())),
};

res.state.insert(key, value);
// TODO: remove unwrap
let key = res.converter.path_to_key(&path).unwrap();
res.state.insert(&key, value);
}
None => {
return Err(io::Error::new(
Expand Down Expand Up @@ -223,9 +216,10 @@ impl<P: StorageEngine> NkvCore<P> {
.await
.send_update(key.to_string(), value.clone());
} else {
let pv_key = self.converter.key_to_path(key);
let val = Value {
// TODO: REMOVE UNWRAP AND ADD PROPER HANDLING
pv: P::new(value.clone(), self.persist_path.join(key)).unwrap(),
pv: P::new(value.clone(), self.converter.root().join(pv_key)).unwrap(),
notifier: Arc::new(Mutex::new(Notification::new())),
};
self.state.insert(key, val);
Expand Down Expand Up @@ -306,8 +300,9 @@ impl<P: StorageEngine> NkvCore<P> {
return Ok(val.notifier.lock().await.subscribe(uuid)?);
} else {
// Client can subscribe to a non-existent value
let pv_key = &self.converter.key_to_path(key);
let val = Value {
pv: P::new(Box::new([]), self.persist_path.join(key))?,
pv: P::new(Box::new([]), self.converter.root().join(pv_key))?,
notifier: Arc::new(Mutex::new(Notification::new())),
};
let tx = val.notifier.lock().await.subscribe(uuid)?;
Expand Down Expand Up @@ -335,7 +330,7 @@ impl<P: StorageEngine> NkvCore<P> {

#[cfg(test)]
mod tests {
use crate::persist_value::FileStorage;
use crate::persist_value::{FileStorage, JsonConverter};

use super::*;
use anyhow::Result;
Expand All @@ -345,7 +340,8 @@ mod tests {
#[tokio::test]
async fn test_put_and_get() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -359,7 +355,8 @@ mod tests {
#[tokio::test]
async fn test_get_nonexistent_key() -> Result<()> {
let temp_dir = TempDir::new()?;
let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let result = nkv.get("nonexistent_key");
assert_eq!(result, Vec::new());
Expand All @@ -370,7 +367,8 @@ mod tests {
#[tokio::test]
async fn test_delete() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data.clone()).await?;
Expand All @@ -385,7 +383,8 @@ mod tests {
#[tokio::test]
async fn test_update_value() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand All @@ -408,13 +407,15 @@ mod tests {
let data3: Box<[u8]> = Box::new([10, 11, 12, 13, 14]);

{
let mut nkv = NkvCore::<FileStorage>::new(path.clone())?;
let c = JsonConverter::new(path.clone())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;
nkv.put("key1", data1.clone()).await?;
nkv.put("key2", data2.clone()).await?;
nkv.put("key3", data3.clone()).await?;
}

let nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;
let result = nkv.get("key1");
assert_eq!(result, vec!(Arc::from(data1)));

Expand All @@ -430,7 +431,8 @@ mod tests {
#[tokio::test]
async fn test_subsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down Expand Up @@ -466,7 +468,8 @@ mod tests {
#[tokio::test]
async fn test_unsubsribe() -> Result<()> {
let temp_dir = TempDir::new()?;
let mut nkv = NkvCore::<FileStorage>::new(temp_dir.path().to_path_buf())?;
let c = JsonConverter::new(temp_dir.path().to_path_buf())?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;

let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]);
nkv.put("key1", data).await?;
Expand Down
47 changes: 45 additions & 2 deletions src/persist_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
// on a file system. Writing to a disk is an
// atomic operation

use crate::traits::StorageEngine;
use crate::traits::{Converter, StorageEngine};
use std::fs;
use std::io::Write;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::NamedTempFile;
Expand All @@ -17,6 +17,49 @@ pub struct FileStorage {
fp: PathBuf,
}

pub struct JsonConverter {
root: PathBuf,
}

impl JsonConverter {
pub fn new(root: PathBuf) -> io::Result<Self> {
if !root.is_dir() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} is a file, not a directory", root),
));
}

Ok(Self { root })
}
}

impl Converter for JsonConverter {
fn root(&self) -> &PathBuf {
return &self.root;
}
fn key_to_path(&self, key: &str) -> String {
key.replace('.', "/") + ".json"
}
fn path_to_key(&self, input: &PathBuf) -> Option<String> {
// Strip the provided prefix from the input path
if let Ok(stripped) = input.strip_prefix(&self.root) {
let input_str = stripped.to_string_lossy();

// Check if the string ends with ".json"
if input_str.ends_with(".json") {
// Remove the ".json" and replace slashes with dots
let without_json = input_str.trim_end_matches(".json");
Some(without_json.replace('/', "."))
} else {
None
}
} else {
None // If the prefix doesn't match, return None
}
}
}

impl StorageEngine for FileStorage {
fn new(new_data: Box<[u8]>, filepath: PathBuf) -> std::io::Result<Self> {
// Create parent folders if they don't exist
Expand Down
5 changes: 3 additions & 2 deletions src/srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::sync::{mpsc, oneshot};
use crate::errors::NotifyKeyValueError;
use crate::nkv::NkvCore;
use crate::notifier::{Notifier, TcpWriter};
use crate::persist_value::FileStorage;
use crate::persist_value::{FileStorage, JsonConverter};
use crate::request_msg::{self, BaseMessage, PutMessage, ServerRequest, ServerResponse};

pub struct PutMsg {
Expand Down Expand Up @@ -72,7 +72,8 @@ impl Server {
let (cancel_tx, cancel_rx) = oneshot::channel();
let (usr_cancel_tx, mut usr_cancel_rx) = oneshot::channel();

let mut nkv = NkvCore::<FileStorage>::new(path)?;
let c = JsonConverter::new(path)?;
let mut nkv = NkvCore::<FileStorage, JsonConverter>::new(c)?;
let addr: SocketAddr = addr.parse().expect("Unable to parse addr");

let srv = Self {
Expand Down
7 changes: 6 additions & 1 deletion src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::nkv::Notification;
// use std::fmt::{self, Debug, Error};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -19,6 +18,12 @@ pub trait StorageEngine: PartialEq {
fn data(&self) -> Arc<[u8]>;
}

pub trait Converter {
fn key_to_path(&self, key: &str) -> String;
fn path_to_key(&self, input: &PathBuf) -> Option<String>;
fn root(&self) -> &PathBuf;
}

#[derive(Debug)]
pub struct Value<V: StorageEngine> {
pub pv: V,
Expand Down

0 comments on commit 5a9e6c2

Please sign in to comment.