Skip to content

Commit

Permalink
Merge pull request nkval#4 from uncleDecart/remove-nats
Browse files Browse the repository at this point in the history
Remove nats and use TcpStream from Tokio. Less is more
  • Loading branch information
uncleDecart authored Aug 26, 2024
2 parents b91e80a + ed275da commit c188f01
Show file tree
Hide file tree
Showing 7 changed files with 491 additions and 340 deletions.
2 changes: 1 addition & 1 deletion src/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() {
} else {
DEFAULT_URL
};
let client = NatsClient::new(&url).await.unwrap();
let mut client = NatsClient::new(&url).await.unwrap();

loop {
let mut input = String::new();
Expand Down
75 changes: 34 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,62 @@
pub mod nkv;
mod notifier;
pub mod request_msg;
pub mod notifier;
mod persist_value;
pub mod request_msg;

use crate::request_msg::*;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::TcpStream;

pub struct NatsClient {
client: async_nats::Client,
writer: BufWriter<tokio::net::tcp::OwnedWriteHalf>,
reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
}

impl NatsClient {
pub async fn new(server: &str) -> Result<Self, async_nats::Error> {
let client = async_nats::connect(server).await?;
Ok(Self { client })
pub async fn new(addr: &str) -> tokio::io::Result<Self> {
let stream = TcpStream::connect(addr).await?;
let (read, write) = stream.into_split();
let writer = BufWriter::new(write);
let reader = BufReader::new(read);

Ok(Self { writer, reader })
}

pub async fn get(&self, key: String) -> Result<ServerResponse, async_nats::Error> {
let req = MessageBody::Get(BaseMessage{
id: 0,
key: key,
});
pub async fn get(&mut self, key: String) -> tokio::io::Result<ServerResponse> {
let req = ServerRequest::Get(BaseMessage { id: 0, key });
self.send_request(&req).await
}

pub async fn put(&self, key: String, val: Box<[u8]>) -> Result<ServerResponse, async_nats::Error> {
let req = MessageBody::Put(PutMessage{
base: BaseMessage{
id: 0,
key: key,
},
pub async fn put(&mut self, key: String, val: Box<[u8]>) -> tokio::io::Result<ServerResponse> {
let req = ServerRequest::Put(PutMessage {
base: BaseMessage { id: 0, key },
value: val,
});
self.send_request(&req).await
}

pub async fn delete(&self, key: String) -> Result<ServerResponse, async_nats::Error> {
let req = MessageBody::Delete(BaseMessage{
id: 0,
key: key,
});
pub async fn delete(&mut self, key: String) -> tokio::io::Result<ServerResponse> {
let req = ServerRequest::Delete(BaseMessage { id: 0, key });
self.send_request(&req).await
}

pub async fn subscribe(&self, key: String) -> Result<ServerResponse, async_nats::Error> {
let req = MessageBody::Subscribe(BaseMessage{
id: 0,
key: key,
});
pub async fn subscribe(&mut self, key: String) -> tokio::io::Result<ServerResponse> {
let req = ServerRequest::Subscribe(BaseMessage { id: 0, key });
self.send_request(&req).await
}
async fn send_request(&self, request: &MessageBody) -> Result<ServerResponse, async_nats::Error> {

async fn send_request(&mut self, request: &ServerRequest) -> tokio::io::Result<ServerResponse> {
let req = serde_json::to_string(&request)?;
let subj = match request {
MessageBody::Put( .. ) => "pubsub.put",
MessageBody::Get( .. ) => "pubsub.get",
MessageBody::Delete( .. ) => "pubsub.delete",
MessageBody::Subscribe( .. ) => "pubsub.subscribe",
};
let reply = self.client
.request(subj, req.into())
.await?;
self.writer.write_all(req.as_bytes()).await?;
self.writer.write_all(b"\n").await?;
self.writer.flush().await?;

let mut response_buf = String::new();

self.reader.read_line(&mut response_buf).await?;
let response: ServerResponse = serde_json::from_str(&response_buf)?;


let response_data: ServerResponse = serde_json::from_slice(&reply.payload)?;
Ok(response_data)
Ok(response)
}
}
49 changes: 27 additions & 22 deletions src/nkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::path::PathBuf;

use std::sync::Arc;

use crate::notifier::{Notifier, WriteStream};
use crate::persist_value::PersistValue;
use crate::notifier::Notifier;
use std::fmt;
use std::env;
use std::net::SocketAddr;

#[derive(Debug, PartialEq)]
pub enum NotifyKeyValueError {
Expand Down Expand Up @@ -41,57 +41,62 @@ struct Value {

pub struct NotifyKeyValue {
state: HashMap<String, Value>,
persist_path: PathBuf,
sock_path: String,
persist_path: PathBuf,
}


impl NotifyKeyValue {
pub fn new(path: std::path::PathBuf) -> Self {
let nats_url = env::var("NATS_URL")
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
Self{
Self {
state: HashMap::new(),
persist_path: path,
sock_path: nats_url,
}
}

pub async fn put(&mut self, key: &str, value: Box<[u8]>) {
if let Some(val) = self.state.get_mut(key) {
let _ = val.pv.update(value);
let _ = val.notifier.send_update(&*val.pv.data());
// TODO: Maybe we can use reference?
// so that we don't have to clone
let _ = val.pv.update(value.clone());
let _ = val.notifier.send_update(value).await;
} else {
let path = self.persist_path.join(key);
let val = PersistValue::new(value, path).expect("TOREMOVE");
let notifier = Notifier::new();

let notifier = Notifier::new(self.sock_path.clone(), format!("pub-{}", key).into()).await.expect("TOREMOVE");
self.state.insert(key.to_string(), Value{
pv: val,
notifier: notifier,
});
self.state
.insert(key.to_string(), Value { pv: val, notifier });
}
}

pub fn get(&self, key: &str) -> Option<Arc<[u8]>> {
self.state.get(key).map(|value| Arc::clone(&value.pv.data()))
self.state
.get(key)
.map(|value| Arc::clone(&value.pv.data()))
}

pub fn delete(&mut self, key: &str) {
self.state.remove(key);
}

pub fn subscribe(&mut self, key: &str) -> Option<String> {
self.state.get(key).map(|value| value.notifier.topic())
pub async fn subscribe(&mut self, key: &str, addr: SocketAddr, stream: WriteStream) {
if let Some(val) = self.state.get_mut(key) {
val.notifier.subscribe(addr, stream).await;
}
}

pub async fn unsubscribe(&mut self, key: &str, addr: &SocketAddr) {
if let Some(val) = self.state.get_mut(key) {
val.notifier.unsubscribe(addr).await;
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio;
use anyhow::Result;
use tempfile::TempDir;
use tokio;

#[tokio::test]
async fn test_put_and_get() -> Result<()> {
Expand All @@ -103,9 +108,9 @@ mod tests {

let result = nkv.get("key1");
assert_eq!(result, Some(Arc::from(data)));

Ok(())
}
}

#[tokio::test]
async fn test_get_nonexistent_key() -> Result<()> {
Expand Down
Loading

0 comments on commit c188f01

Please sign in to comment.