From e4e4fa5d1882df05233b4f63f2aa4566c75c256e Mon Sep 17 00:00:00 2001 From: Avi Deitcher Date: Tue, 13 Aug 2024 18:35:04 +0300 Subject: [PATCH] client and server binaries and lib Signed-off-by: Avi Deitcher --- Cargo.toml | 12 +++++ src/client/main.rs | 96 +++++++++++++++++++++++++++++++++++++++ src/{client.rs => lib.rs} | 5 ++ src/main.rs | 9 ---- src/notifier.rs | 2 +- src/persist_value.rs | 3 +- src/server/main.rs | 22 +++++++++ src/{ => server}/srv.rs | 36 +++++++-------- 8 files changed, 156 insertions(+), 29 deletions(-) create mode 100644 src/client/main.rs rename src/{client.rs => lib.rs} (96%) delete mode 100644 src/main.rs create mode 100644 src/server/main.rs rename src/{ => server}/srv.rs (94%) diff --git a/Cargo.toml b/Cargo.toml index 25c0c62..63029c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,18 @@ tempdir = "0.3" tempfile = "3.10.1" tokio = { version = "1.39.1", features = ["full"] } +[lib] +name = "nkv" +path = "src/lib.rs" + +[[bin]] +name = "nkv-server" +path = "src/server/main.rs" + +[[bin]] +name = "nkv-client" +path = "src/client/main.rs" + [profile.release] strip = true # Automatically strip symbols from the binary opt-level = "z" # Optimize for size. diff --git a/src/client/main.rs b/src/client/main.rs new file mode 100644 index 0000000..58c1559 --- /dev/null +++ b/src/client/main.rs @@ -0,0 +1,96 @@ +use std::env; +use std::io::{self, Write}; + +use nkv::request_msg; +use nkv::NatsClient; + +const DEFAULT_URL: &str = "localhost:4222"; + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + + let url = if args.len() > 1 { + &args[1] + } else { + DEFAULT_URL + }; + let client = NatsClient::new(&url).await.unwrap(); + + loop { + let mut input = String::new(); + + // Prompt the user for input + println!("Please enter the command words separated by whitespace, finish with a character return. Enter HELP for help:"); + io::stdout().flush().unwrap(); // Ensure the prompt is shown immediately + + // Read the input from the user + io::stdin().read_line(&mut input) + .expect("Failed to read line"); + + // Trim the input to remove any trailing newline characters + let input = input.trim(); + + // Split the input on whitespace + let parts: Vec<&str> = input.split_whitespace().collect(); + + if let Some(command) = parts.get(0) { + match *command { + "PUT" => { + if let(Some(_key), Some(_value)) = (parts.get(1), parts.get(2)) { + let byte_slice: &[u8] = _value.as_bytes(); + + // Convert the byte slice into a boxed slice + let boxed_bytes: Box<[u8]> = byte_slice.into(); + let resp = client.put(_key.to_string(), boxed_bytes).await.unwrap(); + assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{ + id: 0, + status: http::StatusCode::OK, + message: "No Error".to_string(), + })) + } else { + println!("PUT requires a key and a value"); + } + } + "GET" => { + if let Some(_key) = parts.get(1) { + let resp = client.get(_key.to_string()).await.unwrap(); + assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{ + id: 0, + status: http::StatusCode::OK, + message: "No Error".to_string(), + })) + } else { + println!("GET requires a key"); + } + } + "DELETE" => { + if let Some(_key) = parts.get(1) { + let resp = client.delete(_key.to_string()).await.unwrap(); + assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{ + id: 0, + status: http::StatusCode::OK, + message: "No Error".to_string(), + })) + } else { + println!("DELETE requires a key"); + } + } + "QUIT" => { + break; + } + "HELP" => { + println!("Commands:"); + println!("PUT key value"); + println!("GET key"); + println!("DELETE key"); + println!("HELP"); + println!("QUIT"); + } + &_ => { + println!("Unknown command"); + } + } + } + } +} diff --git a/src/client.rs b/src/lib.rs similarity index 96% rename from src/client.rs rename to src/lib.rs index ccbc800..82627d0 100644 --- a/src/client.rs +++ b/src/lib.rs @@ -1,3 +1,8 @@ +pub mod nkv; +mod notifier; +pub mod request_msg; +mod persist_value; + use crate::request_msg::*; pub struct NatsClient { diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 8431d16..0000000 --- a/src/main.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod persist_value; -mod nkv; -mod notifier; -mod srv; -mod request_msg; -mod client; - -#[tokio::main] -async fn main() { } diff --git a/src/notifier.rs b/src/notifier.rs index 6a10c18..ece1db7 100644 --- a/src/notifier.rs +++ b/src/notifier.rs @@ -2,7 +2,6 @@ extern crate serde; extern crate serde_json; use serde::{Deserialize, Serialize}; -use std::env; #[derive(Serialize, Deserialize, Debug, PartialEq)] enum MessageType @@ -92,6 +91,7 @@ impl Drop for Notifier { #[cfg(test)] mod tests { use super::*; + use std::env; // Helper function to create a Notifier instance for testing async fn create_test_notifier() -> Notifier { diff --git a/src/persist_value.rs b/src/persist_value.rs index 6b1e6c6..4b25735 100644 --- a/src/persist_value.rs +++ b/src/persist_value.rs @@ -1,4 +1,4 @@ -use tempfile::{TempDir, NamedTempFile}; +use tempfile::NamedTempFile; use std::path::{Path, PathBuf}; use anyhow::{Result, Context}; use std::sync::Arc; @@ -69,6 +69,7 @@ fn atomic_write(data: &[u8], filename: &Path) -> Result<()> { #[cfg(test)] mod tests { use super::*; + use tempfile::TempDir; #[test] fn test_persist_value() -> Result<()> { diff --git a/src/server/main.rs b/src/server/main.rs new file mode 100644 index 0000000..739c34d --- /dev/null +++ b/src/server/main.rs @@ -0,0 +1,22 @@ +use std::env; +use tempfile::TempDir; + +mod srv; + +const DEFAULT_URL: &str = "localhost:4222"; + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + + let url = if args.len() > 1 { + &args[1] + } else { + DEFAULT_URL + }; + + let temp_dir = TempDir::new().expect("Failed to create temporary directory"); + + // creates a task where it waits to serve threads + let _srv = srv::Server::new(url.to_string(), temp_dir.path().to_path_buf()).await.unwrap(); +} diff --git a/src/srv.rs b/src/server/srv.rs similarity index 94% rename from src/srv.rs rename to src/server/srv.rs index bf7b957..470500b 100644 --- a/src/srv.rs +++ b/src/server/srv.rs @@ -1,13 +1,11 @@ -use tempfile::TempDir; use tokio::{task, sync::mpsc }; use std::sync::Arc; -use crate::nkv::{self, NotifyKeyValue}; -use crate::request_msg::{self, BaseResp, DataResp, ServerResponse}; +use ::nkv::nkv; +use ::nkv::request_msg::{self, BaseMessage, PutMessage, ServerResponse}; use http::StatusCode; use futures::StreamExt; -use std::env; pub struct PutMsg { key: String, @@ -56,7 +54,7 @@ impl Server { let (sub_tx, mut sub_rx) = mpsc::unbounded_channel::(); let (_cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::(); - let mut nkv = NotifyKeyValue::new(path); + let mut nkv = nkv::NotifyKeyValue::new(path); let nc = async_nats::connect(nats_url.clone()).await?; let mut sub = nc.subscribe("pubsub.*").await.unwrap(); @@ -84,7 +82,7 @@ impl Server { "pubsub.put" => Self::handle_put(put_tx, json_body).await, "pubsub.delete" => Self::handle_basic_msg(del_tx, json_body).await, "pubsub.subscribe" => Self::handle_sub(sub_tx, json_body).await, - _ => ServerResponse::Base(BaseResp{id: 0, status: StatusCode::OK, message: "tmp".to_string()}), + _ => ServerResponse::Base(request_msg::BaseResp{id: 0, status: StatusCode::OK, message: "tmp".to_string()}), }; let json_reply = serde_json::to_string(&reply).unwrap(); let _ = nc.publish(reply_to, json_reply.into()).await; @@ -142,7 +140,7 @@ impl Server { // TODO: handle error and throw response let _ = nkv_tx.send(PutMsg{key: base.key, value: value , resp_tx: resp_tx}); let nkv_resp = resp_rx.recv().await.unwrap(); - let resp = BaseResp { + let resp = request_msg::BaseResp { id: base.id, status: nkv_resp.to_http_status(), message: nkv_resp.to_string(), @@ -150,7 +148,7 @@ impl Server { ServerResponse::Base(resp) } _ => { - let resp = BaseResp { + let resp = request_msg::BaseResp { id: 0, status: StatusCode::INTERNAL_SERVER_ERROR, message: "wrong message for put handle".to_string(), @@ -170,8 +168,8 @@ impl Server { if let Some(v) = nkv_resp.value { data = v.to_vec(); } - let resp = DataResp { - base: BaseResp { + let resp = request_msg::DataResp { + base: request_msg::BaseResp { id: id, status: nkv_resp.err.to_http_status(), message: nkv_resp.err.to_string(), @@ -181,7 +179,7 @@ impl Server { ServerResponse::Get(resp) } _ => { - let resp = BaseResp { + let resp = request_msg::BaseResp { id: 0, status: StatusCode::INTERNAL_SERVER_ERROR, message: "wrong message for get handle".to_string(), @@ -197,7 +195,7 @@ impl Server { let (resp_tx, mut resp_rx) = mpsc::channel(1); let _ = nkv_tx.send(BaseMsg{key: key, resp_tx: resp_tx}); let nkv_resp = resp_rx.recv().await.unwrap(); - let resp = BaseResp { + let resp = request_msg::BaseResp { id: id, status: nkv_resp.to_http_status(), message: nkv_resp.to_string(), @@ -205,7 +203,7 @@ impl Server { ServerResponse::Base(resp) } _ => { - let resp = BaseResp { + let resp = request_msg::BaseResp { id: 0, status: StatusCode::INTERNAL_SERVER_ERROR, message: "wrong message for the handle".to_string(), @@ -221,8 +219,8 @@ impl Server { let (resp_tx, mut resp_rx) = mpsc::channel(1); let _ = nkv_tx.send(SubMsg{key: key, resp_tx: resp_tx}); let nkv_resp = resp_rx.recv().await.unwrap(); - let resp = DataResp { - base: BaseResp { + let resp = request_msg::DataResp { + base: request_msg::BaseResp { id: id, status: nkv_resp.err.to_http_status(), message: nkv_resp.err.to_string(), @@ -232,7 +230,7 @@ impl Server { ServerResponse::Put(resp) } _ => { - let resp = BaseResp { + let resp = request_msg::BaseResp { id: 0, status: StatusCode::INTERNAL_SERVER_ERROR, message: "wrong message for sub handle".to_string(), @@ -251,9 +249,11 @@ impl Server { #[cfg(test)] mod tests { use super::*; + use std::env; + use tempfile::TempDir; use tokio; - use crate::client::NatsClient; - + use ::nkv::NatsClient; + #[tokio::test] async fn test_server() { let temp_dir = TempDir::new().expect("Failed to create temporary directory");