From 5a9e6c27196f657de113e46c2f8ededf82782e74 Mon Sep 17 00:00:00 2001 From: Pavel Abramov Date: Mon, 14 Oct 2024 15:22:56 +0200 Subject: [PATCH] Add Converter trait and JsonConverter trait In order to support backward compatibility with existing systems if you are migrating to nkv or want to fit into a specific file structure Converter trait was added into NkvCore Signed-off-by: Pavel Abramov --- benches/nkv_bench.rs | 21 +++++++-------- src/nkv.rs | 61 +++++++++++++++++++++++--------------------- src/persist_value.rs | 47 ++++++++++++++++++++++++++++++++-- src/srv.rs | 5 ++-- src/traits.rs | 7 ++++- 5 files changed, 97 insertions(+), 44 deletions(-) diff --git a/benches/nkv_bench.rs b/benches/nkv_bench.rs index 3bfbe9c..ffbabf2 100644 --- a/benches/nkv_bench.rs +++ b/benches/nkv_bench.rs @@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion}; use nkv::nkv::NkvCore; -use nkv::persist_value::FileStorage; +use nkv::persist_value::{FileStorage, JsonConverter}; use nkv::srv::{BaseMsg, GetMsg, PutMsg, Server}; use nkv::trie::Trie; use tempfile::TempDir; @@ -21,7 +21,8 @@ fn bench_nkv(c: &mut Criterion) { group.bench_function(format!("nkv_new"), |b| { b.to_async(&rt).iter(|| async { let temp_dir = TempDir::new().unwrap(); - let result = NkvCore::::new(temp_dir.path().to_path_buf()).unwrap(); + let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap(); + let result = NkvCore::::new(c).unwrap(); black_box(result) }) }); @@ -36,8 +37,8 @@ fn bench_nkv(c: &mut Criterion) { }, |input| async { let temp_dir = TempDir::new().unwrap(); - let mut nkv = - NkvCore::::new(temp_dir.path().to_path_buf()).unwrap(); + let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap(); + let mut nkv = NkvCore::::new(c).unwrap(); let result = nkv.put("key1", input).await; black_box(result) }, @@ -55,8 +56,8 @@ fn bench_nkv(c: &mut Criterion) { }, |(data, new_data)| async { let temp_dir = TempDir::new().unwrap(); - let mut nkv = - NkvCore::::new(temp_dir.path().to_path_buf()).unwrap(); + let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap(); + let mut nkv = NkvCore::::new(c).unwrap(); nkv.put("key1", data).await; let result = nkv.put("key1", new_data).await; black_box(result) @@ -70,8 +71,8 @@ fn bench_nkv(c: &mut Criterion) { || { let data = vec![0u8; size].into_boxed_slice(); let temp_dir = TempDir::new().unwrap(); - let mut nkv = - NkvCore::::new(temp_dir.path().to_path_buf()).unwrap(); + let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap(); + let mut nkv = NkvCore::::new(c).unwrap(); let rt = Runtime::new().unwrap(); rt.block_on(nkv.put("key1", data)); nkv @@ -89,8 +90,8 @@ fn bench_nkv(c: &mut Criterion) { }, |data| async { let temp_dir = TempDir::new().unwrap(); - let mut nkv = - NkvCore::::new(temp_dir.path().to_path_buf()).unwrap(); + let c = JsonConverter::new(temp_dir.path().to_path_buf()).unwrap(); + let mut nkv = NkvCore::::new(c).unwrap(); nkv.put("key1", data).await; let result = nkv.delete("key1").await; diff --git a/src/nkv.rs b/src/nkv.rs index fe6b07e..026fd68 100644 --- a/src/nkv.rs +++ b/src/nkv.rs @@ -8,13 +8,12 @@ use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::{fs, future::Future, io}; use crate::errors::NotifyKeyValueError; -use crate::traits::{StorageEngine, Value}; +use crate::traits::{Converter, StorageEngine, Value}; use crate::trie::{Trie, TrieNode}; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, Mutex}; @@ -131,39 +130,33 @@ impl Notification { } } -pub struct NkvCore { +pub struct NkvCore { state: Trie>, - persist_path: PathBuf, + converter: C, } -impl NkvCore

{ - pub fn new(path: std::path::PathBuf) -> std::io::Result { +impl NkvCore { + pub fn new(converter: C) -> std::io::Result { let mut res = Self { state: Trie::new(), - persist_path: path, + converter, }; - if !res.persist_path.is_dir() { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("{:?} is a file, not a directory", res.persist_path), - )); - } - - for entry in fs::read_dir(&res.persist_path)? { + for entry in fs::read_dir(&res.converter.root())? { let entry = entry?; let path = entry.path(); if path.is_file() { match path.file_name() { - Some(fp) => { - let key = fp.to_str().unwrap(); + Some(_) => { let value = Value { pv: P::from_checkpoint(&path)?, notifier: Arc::new(Mutex::new(Notification::new())), }; - res.state.insert(key, value); + // TODO: remove unwrap + let key = res.converter.path_to_key(&path).unwrap(); + res.state.insert(&key, value); } None => { return Err(io::Error::new( @@ -223,9 +216,10 @@ impl NkvCore

{ .await .send_update(key.to_string(), value.clone()); } else { + let pv_key = self.converter.key_to_path(key); let val = Value { // TODO: REMOVE UNWRAP AND ADD PROPER HANDLING - pv: P::new(value.clone(), self.persist_path.join(key)).unwrap(), + pv: P::new(value.clone(), self.converter.root().join(pv_key)).unwrap(), notifier: Arc::new(Mutex::new(Notification::new())), }; self.state.insert(key, val); @@ -306,8 +300,9 @@ impl NkvCore

{ return Ok(val.notifier.lock().await.subscribe(uuid)?); } else { // Client can subscribe to a non-existent value + let pv_key = &self.converter.key_to_path(key); let val = Value { - pv: P::new(Box::new([]), self.persist_path.join(key))?, + pv: P::new(Box::new([]), self.converter.root().join(pv_key))?, notifier: Arc::new(Mutex::new(Notification::new())), }; let tx = val.notifier.lock().await.subscribe(uuid)?; @@ -335,7 +330,7 @@ impl NkvCore

{ #[cfg(test)] mod tests { - use crate::persist_value::FileStorage; + use crate::persist_value::{FileStorage, JsonConverter}; use super::*; use anyhow::Result; @@ -345,7 +340,8 @@ mod tests { #[tokio::test] async fn test_put_and_get() -> Result<()> { let temp_dir = TempDir::new()?; - let mut nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let mut nkv = NkvCore::::new(c)?; let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]); nkv.put("key1", data.clone()).await?; @@ -359,7 +355,8 @@ mod tests { #[tokio::test] async fn test_get_nonexistent_key() -> Result<()> { let temp_dir = TempDir::new()?; - let nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let nkv = NkvCore::::new(c)?; let result = nkv.get("nonexistent_key"); assert_eq!(result, Vec::new()); @@ -370,7 +367,8 @@ mod tests { #[tokio::test] async fn test_delete() -> Result<()> { let temp_dir = TempDir::new()?; - let mut nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let mut nkv = NkvCore::::new(c)?; let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]); nkv.put("key1", data.clone()).await?; @@ -385,7 +383,8 @@ mod tests { #[tokio::test] async fn test_update_value() -> Result<()> { let temp_dir = TempDir::new()?; - let mut nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let mut nkv = NkvCore::::new(c)?; let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]); nkv.put("key1", data).await?; @@ -408,13 +407,15 @@ mod tests { let data3: Box<[u8]> = Box::new([10, 11, 12, 13, 14]); { - let mut nkv = NkvCore::::new(path.clone())?; + let c = JsonConverter::new(path.clone())?; + let mut nkv = NkvCore::::new(c)?; nkv.put("key1", data1.clone()).await?; nkv.put("key2", data2.clone()).await?; nkv.put("key3", data3.clone()).await?; } - let nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let nkv = NkvCore::::new(c)?; let result = nkv.get("key1"); assert_eq!(result, vec!(Arc::from(data1))); @@ -430,7 +431,8 @@ mod tests { #[tokio::test] async fn test_subsribe() -> Result<()> { let temp_dir = TempDir::new()?; - let mut nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let mut nkv = NkvCore::::new(c)?; let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]); nkv.put("key1", data).await?; @@ -466,7 +468,8 @@ mod tests { #[tokio::test] async fn test_unsubsribe() -> Result<()> { let temp_dir = TempDir::new()?; - let mut nkv = NkvCore::::new(temp_dir.path().to_path_buf())?; + let c = JsonConverter::new(temp_dir.path().to_path_buf())?; + let mut nkv = NkvCore::::new(c)?; let data: Box<[u8]> = Box::new([1, 2, 3, 4, 5]); nkv.put("key1", data).await?; diff --git a/src/persist_value.rs b/src/persist_value.rs index d998e3c..e0feec8 100644 --- a/src/persist_value.rs +++ b/src/persist_value.rs @@ -4,9 +4,9 @@ // on a file system. Writing to a disk is an // atomic operation -use crate::traits::StorageEngine; +use crate::traits::{Converter, StorageEngine}; use std::fs; -use std::io::Write; +use std::io::{self, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::NamedTempFile; @@ -17,6 +17,49 @@ pub struct FileStorage { fp: PathBuf, } +pub struct JsonConverter { + root: PathBuf, +} + +impl JsonConverter { + pub fn new(root: PathBuf) -> io::Result { + if !root.is_dir() { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("{:?} is a file, not a directory", root), + )); + } + + Ok(Self { root }) + } +} + +impl Converter for JsonConverter { + fn root(&self) -> &PathBuf { + return &self.root; + } + fn key_to_path(&self, key: &str) -> String { + key.replace('.', "/") + ".json" + } + fn path_to_key(&self, input: &PathBuf) -> Option { + // Strip the provided prefix from the input path + if let Ok(stripped) = input.strip_prefix(&self.root) { + let input_str = stripped.to_string_lossy(); + + // Check if the string ends with ".json" + if input_str.ends_with(".json") { + // Remove the ".json" and replace slashes with dots + let without_json = input_str.trim_end_matches(".json"); + Some(without_json.replace('/', ".")) + } else { + None + } + } else { + None // If the prefix doesn't match, return None + } + } +} + impl StorageEngine for FileStorage { fn new(new_data: Box<[u8]>, filepath: PathBuf) -> std::io::Result { // Create parent folders if they don't exist diff --git a/src/srv.rs b/src/srv.rs index 3c7e59e..b8f78a3 100644 --- a/src/srv.rs +++ b/src/srv.rs @@ -16,7 +16,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::errors::NotifyKeyValueError; use crate::nkv::NkvCore; use crate::notifier::{Notifier, TcpWriter}; -use crate::persist_value::FileStorage; +use crate::persist_value::{FileStorage, JsonConverter}; use crate::request_msg::{self, BaseMessage, PutMessage, ServerRequest, ServerResponse}; pub struct PutMsg { @@ -72,7 +72,8 @@ impl Server { let (cancel_tx, cancel_rx) = oneshot::channel(); let (usr_cancel_tx, mut usr_cancel_rx) = oneshot::channel(); - let mut nkv = NkvCore::::new(path)?; + let c = JsonConverter::new(path)?; + let mut nkv = NkvCore::::new(c)?; let addr: SocketAddr = addr.parse().expect("Unable to parse addr"); let srv = Self { diff --git a/src/traits.rs b/src/traits.rs index 39a4e95..b3b670d 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::nkv::Notification; -// use std::fmt::{self, Debug, Error}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::Mutex; @@ -19,6 +18,12 @@ pub trait StorageEngine: PartialEq { fn data(&self) -> Arc<[u8]>; } +pub trait Converter { + fn key_to_path(&self, key: &str) -> String; + fn path_to_key(&self, input: &PathBuf) -> Option; + fn root(&self) -> &PathBuf; +} + #[derive(Debug)] pub struct Value { pub pv: V,