Skip to content

Commit

Permalink
Merge pull request nkval#1 from uncleDecart/restructure-binaries
Browse files Browse the repository at this point in the history
Restructure binaries
  • Loading branch information
uncleDecart authored Aug 15, 2024
2 parents 39df90f + e4e4fa5 commit 2a971c0
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 29 deletions.
12 changes: 12 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ tempdir = "0.3"
tempfile = "3.10.1"
tokio = { version = "1.39.1", features = ["full"] }

[lib]
name = "nkv"
path = "src/lib.rs"

[[bin]]
name = "nkv-server"
path = "src/server/main.rs"

[[bin]]
name = "nkv-client"
path = "src/client/main.rs"

[profile.release]
strip = true # Automatically strip symbols from the binary
opt-level = "z" # Optimize for size.
Expand Down
96 changes: 96 additions & 0 deletions src/client/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::env;
use std::io::{self, Write};

use nkv::request_msg;
use nkv::NatsClient;

const DEFAULT_URL: &str = "localhost:4222";

#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();

let url = if args.len() > 1 {
&args[1]
} else {
DEFAULT_URL
};
let client = NatsClient::new(&url).await.unwrap();

loop {
let mut input = String::new();

// Prompt the user for input
println!("Please enter the command words separated by whitespace, finish with a character return. Enter HELP for help:");
io::stdout().flush().unwrap(); // Ensure the prompt is shown immediately

// Read the input from the user
io::stdin().read_line(&mut input)
.expect("Failed to read line");

// Trim the input to remove any trailing newline characters
let input = input.trim();

// Split the input on whitespace
let parts: Vec<&str> = input.split_whitespace().collect();

if let Some(command) = parts.get(0) {
match *command {
"PUT" => {
if let(Some(_key), Some(_value)) = (parts.get(1), parts.get(2)) {
let byte_slice: &[u8] = _value.as_bytes();

// Convert the byte slice into a boxed slice
let boxed_bytes: Box<[u8]> = byte_slice.into();
let resp = client.put(_key.to_string(), boxed_bytes).await.unwrap();
assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{
id: 0,
status: http::StatusCode::OK,
message: "No Error".to_string(),
}))
} else {
println!("PUT requires a key and a value");
}
}
"GET" => {
if let Some(_key) = parts.get(1) {
let resp = client.get(_key.to_string()).await.unwrap();
assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{
id: 0,
status: http::StatusCode::OK,
message: "No Error".to_string(),
}))
} else {
println!("GET requires a key");
}
}
"DELETE" => {
if let Some(_key) = parts.get(1) {
let resp = client.delete(_key.to_string()).await.unwrap();
assert_eq!(resp, request_msg::ServerResponse::Base(request_msg::BaseResp{
id: 0,
status: http::StatusCode::OK,
message: "No Error".to_string(),
}))
} else {
println!("DELETE requires a key");
}
}
"QUIT" => {
break;
}
"HELP" => {
println!("Commands:");
println!("PUT key value");
println!("GET key");
println!("DELETE key");
println!("HELP");
println!("QUIT");
}
&_ => {
println!("Unknown command");
}
}
}
}
}
5 changes: 5 additions & 0 deletions src/client.rs → src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
pub mod nkv;
mod notifier;
pub mod request_msg;
mod persist_value;

use crate::request_msg::*;

pub struct NatsClient {
Expand Down
9 changes: 0 additions & 9 deletions src/main.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ extern crate serde;
extern crate serde_json;

use serde::{Deserialize, Serialize};
use std::env;

#[derive(Serialize, Deserialize, Debug, PartialEq)]
enum MessageType<T>
Expand Down Expand Up @@ -92,6 +91,7 @@ impl Drop for Notifier {
#[cfg(test)]
mod tests {
use super::*;
use std::env;

// Helper function to create a Notifier instance for testing
async fn create_test_notifier() -> Notifier {
Expand Down
3 changes: 2 additions & 1 deletion src/persist_value.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tempfile::{TempDir, NamedTempFile};
use tempfile::NamedTempFile;
use std::path::{Path, PathBuf};
use anyhow::{Result, Context};
use std::sync::Arc;
Expand Down Expand Up @@ -69,6 +69,7 @@ fn atomic_write(data: &[u8], filename: &Path) -> Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

#[test]
fn test_persist_value() -> Result<()> {
Expand Down
22 changes: 22 additions & 0 deletions src/server/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::env;
use tempfile::TempDir;

mod srv;

const DEFAULT_URL: &str = "localhost:4222";

#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();

let url = if args.len() > 1 {
&args[1]
} else {
DEFAULT_URL
};

let temp_dir = TempDir::new().expect("Failed to create temporary directory");

// creates a task where it waits to serve threads
let _srv = srv::Server::new(url.to_string(), temp_dir.path().to_path_buf()).await.unwrap();
}
36 changes: 18 additions & 18 deletions src/srv.rs → src/server/srv.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use tempfile::TempDir;
use tokio::{task, sync::mpsc };

use std::sync::Arc;

use crate::nkv::{self, NotifyKeyValue};
use crate::request_msg::{self, BaseResp, DataResp, ServerResponse};
use ::nkv::nkv;
use ::nkv::request_msg::{self, BaseMessage, PutMessage, ServerResponse};
use http::StatusCode;
use futures::StreamExt;
use std::env;

pub struct PutMsg {
key: String,
Expand Down Expand Up @@ -56,7 +54,7 @@ impl Server {
let (sub_tx, mut sub_rx) = mpsc::unbounded_channel::<SubMsg>();
let (_cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<BaseMsg>();

let mut nkv = NotifyKeyValue::new(path);
let mut nkv = nkv::NotifyKeyValue::new(path);

let nc = async_nats::connect(nats_url.clone()).await?;
let mut sub = nc.subscribe("pubsub.*").await.unwrap();
Expand Down Expand Up @@ -84,7 +82,7 @@ impl Server {
"pubsub.put" => Self::handle_put(put_tx, json_body).await,
"pubsub.delete" => Self::handle_basic_msg(del_tx, json_body).await,
"pubsub.subscribe" => Self::handle_sub(sub_tx, json_body).await,
_ => ServerResponse::Base(BaseResp{id: 0, status: StatusCode::OK, message: "tmp".to_string()}),
_ => ServerResponse::Base(request_msg::BaseResp{id: 0, status: StatusCode::OK, message: "tmp".to_string()}),
};
let json_reply = serde_json::to_string(&reply).unwrap();
let _ = nc.publish(reply_to, json_reply.into()).await;
Expand Down Expand Up @@ -142,15 +140,15 @@ impl Server {
// TODO: handle error and throw response
let _ = nkv_tx.send(PutMsg{key: base.key, value: value , resp_tx: resp_tx});
let nkv_resp = resp_rx.recv().await.unwrap();
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: base.id,
status: nkv_resp.to_http_status(),
message: nkv_resp.to_string(),
};
ServerResponse::Base(resp)
}
_ => {
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: 0,
status: StatusCode::INTERNAL_SERVER_ERROR,
message: "wrong message for put handle".to_string(),
Expand All @@ -170,8 +168,8 @@ impl Server {
if let Some(v) = nkv_resp.value {
data = v.to_vec();
}
let resp = DataResp {
base: BaseResp {
let resp = request_msg::DataResp {
base: request_msg::BaseResp {
id: id,
status: nkv_resp.err.to_http_status(),
message: nkv_resp.err.to_string(),
Expand All @@ -181,7 +179,7 @@ impl Server {
ServerResponse::Get(resp)
}
_ => {
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: 0,
status: StatusCode::INTERNAL_SERVER_ERROR,
message: "wrong message for get handle".to_string(),
Expand All @@ -197,15 +195,15 @@ impl Server {
let (resp_tx, mut resp_rx) = mpsc::channel(1);
let _ = nkv_tx.send(BaseMsg{key: key, resp_tx: resp_tx});
let nkv_resp = resp_rx.recv().await.unwrap();
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: id,
status: nkv_resp.to_http_status(),
message: nkv_resp.to_string(),
};
ServerResponse::Base(resp)
}
_ => {
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: 0,
status: StatusCode::INTERNAL_SERVER_ERROR,
message: "wrong message for the handle".to_string(),
Expand All @@ -221,8 +219,8 @@ impl Server {
let (resp_tx, mut resp_rx) = mpsc::channel(1);
let _ = nkv_tx.send(SubMsg{key: key, resp_tx: resp_tx});
let nkv_resp = resp_rx.recv().await.unwrap();
let resp = DataResp {
base: BaseResp {
let resp = request_msg::DataResp {
base: request_msg::BaseResp {
id: id,
status: nkv_resp.err.to_http_status(),
message: nkv_resp.err.to_string(),
Expand All @@ -232,7 +230,7 @@ impl Server {
ServerResponse::Put(resp)
}
_ => {
let resp = BaseResp {
let resp = request_msg::BaseResp {
id: 0,
status: StatusCode::INTERNAL_SERVER_ERROR,
message: "wrong message for sub handle".to_string(),
Expand All @@ -251,9 +249,11 @@ impl Server {
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use tempfile::TempDir;
use tokio;
use crate::client::NatsClient;

use ::nkv::NatsClient;
#[tokio::test]
async fn test_server() {
let temp_dir = TempDir::new().expect("Failed to create temporary directory");
Expand Down

0 comments on commit 2a971c0

Please sign in to comment.