Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement resp INCR command #62

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/entrystore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ protocol-common = { path = "../protocol/common" }
protocol-memcache = { path = "../protocol/memcache" }
protocol-ping = { path = "../protocol/ping" }
protocol-resp = { path = "../protocol/resp" }
seg = { path = "../storage/seg" }
seg = { path = "../storage/seg" }
storage-types = { path = "../storage/types" }
35 changes: 34 additions & 1 deletion src/entrystore/src/seg/resp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ use protocol_common::*;

use protocol_resp::*;

use seg::Value::{Bytes, U64};
use std::time::Duration;
use storage_types::parse_signed_redis;
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved

impl Execute<Request, Response> for Seg {
fn execute(&mut self, request: &Request) -> Response {
match request {
Request::Get(get) => self.get(get),
Request::Set(set) => self.set(set),
Request::Incr(incr) => self.incr(incr),
_ => Response::error("not supported"),
}
}
Expand All @@ -39,15 +42,45 @@ impl Storage for Seg {
ExpireTime::Seconds(n) => n,
_ => 0,
};
let value = match parse_signed_redis(set.value()) {
Some(integer) => U64(integer as u64),
None => Bytes(set.value()),
};

if self
.data
.insert(set.key(), set.value(), None, Duration::from_secs(ttl))
.insert(set.key(), value, None, Duration::from_secs(ttl))
.is_ok()
{
Response::simple_string("OK")
} else {
Response::error("not stored")
}
}

fn incr(&mut self, incr: &Incr) -> Response {
if let Some(mut item) = self.data.get(incr.key()) {
brayniac marked this conversation as resolved.
Show resolved Hide resolved
match item.value() {
seg::Value::Bytes(b) => Response::error("wrong data type"),
seg::Value::U64(uint) => {
if let Some(incremented) = (uint as i64).checked_add(1) {
item.wrapping_add(1);
brayniac marked this conversation as resolved.
Show resolved Hide resolved
Response::integer(incremented)
} else {
Response::error("increment or decrement would overflow")
}
}
}
} else {
if self
.data
.insert(incr.key(), 1 as u64, None, Duration::from_secs(0))
.is_ok()
{
Response::integer(1)
} else {
Response::error("not stored")
}
}
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
}
}
127 changes: 127 additions & 0 deletions src/protocol/resp/src/request/incr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use super::*;
use logger::klog;
use std::io::{Error, ErrorKind};
use std::sync::Arc;

#[derive(Debug, PartialEq, Eq)]
pub struct Incr {
key: Arc<[u8]>,
}

impl TryFrom<Message> for Incr {
type Error = Error;

fn try_from(other: Message) -> Result<Self, Error> {
hderms marked this conversation as resolved.
Show resolved Hide resolved
if let Message::Array(array) = other {
if array.inner.is_none() {
return Err(Error::new(ErrorKind::Other, "malformed command"));
}

let mut array = array.inner.unwrap();

if array.len() != 2 {
return Err(Error::new(ErrorKind::Other, "malformed command"));
}

let key = if let Message::BulkString(key) = array.remove(1) {
if key.inner.is_none() {
return Err(Error::new(ErrorKind::Other, "malformed command"));
}

let key = key.inner.unwrap();

if key.len() == 0 {
return Err(Error::new(ErrorKind::Other, "malformed command"));
}

key
} else {
return Err(Error::new(ErrorKind::Other, "malformed command"));
};

Ok(Self { key })
} else {
Err(Error::new(ErrorKind::Other, "malformed command"))
}
}
}

impl Incr {
pub fn new(key: &[u8]) -> Self {
Self { key: key.into() }
}

pub fn key(&self) -> &[u8] {
&self.key
}
}

impl From<&Incr> for Message {
fn from(other: &Incr) -> Message {
Message::Array(Array {
inner: Some(vec![
Message::BulkString(BulkString::new(b"INCR")),
Message::BulkString(BulkString::from(other.key.clone())),
]),
})
}
}

impl Compose for Incr {
fn compose(&self, buf: &mut dyn BufMut) -> usize {
let message = Message::from(self);
message.compose(buf)
}
}

impl Klog for Incr {
hderms marked this conversation as resolved.
Show resolved Hide resolved
type Response = Response;

fn klog(&self, response: &Self::Response) {
let (code, len) = match response {
Message::BulkString(_) if *response == Response::null() => (ResponseCode::Miss, 0),
Message::BulkString(s) => (ResponseCode::Hit, s.len()),
_ => (ResponseCode::Miss, 0),
};

klog!(
"\"incr {}\" {} {}",
string_key(self.key()),
code as u32,
len
);
}
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parser() {
let parser = RequestParser::new();
assert_eq!(
parser.parse(b"incr 0\r\n").unwrap().into_inner(),
Request::Incr(Incr::new(b"0"))
);

assert_eq!(
parser
.parse(b"incr \"\0\r\n key\"\r\n")
.unwrap()
.into_inner(),
Request::Incr(Incr::new(b"\0\r\n key"))
);

assert_eq!(
parser
.parse(b"*2\r\n$3\r\nincr\r\n$1\r\n0\r\n")
.unwrap()
.into_inner(),
Request::Incr(Incr::new(b"0"))
);
}
}
12 changes: 12 additions & 0 deletions src/protocol/resp/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod hlen;
mod hmget;
mod hset;
mod hvals;
mod incr;
mod lindex;
mod llen;
mod lpop;
Expand Down Expand Up @@ -68,6 +69,7 @@ pub use hlen::*;
pub use hmget::*;
pub use hset::*;
pub use hvals::*;
pub use incr::*;
pub use sadd::*;
pub use set::*;

Expand Down Expand Up @@ -191,6 +193,7 @@ impl Parse<Request> for RequestParser {
Some(b"hincrby") | Some(b"HINCRBY") => {
HashIncrBy::try_from(message).map(Request::from)
}
Some(b"incr") | Some(b"INCR") => Incr::try_from(message).map(Request::from),
Some(b"lindex") | Some(b"LINDEX") => {
ListIndex::try_from(message).map(Request::from)
}
Expand Down Expand Up @@ -263,6 +266,7 @@ impl Compose for Request {
Self::HashSet(r) => r.compose(buf),
Self::HashValues(r) => r.compose(buf),
Self::HashIncrBy(r) => r.compose(buf),
Self::Incr(r) => r.compose(buf),
Self::ListIndex(r) => r.compose(buf),
Self::ListLen(r) => r.compose(buf),
Self::ListPop(r) => r.compose(buf),
Expand Down Expand Up @@ -297,6 +301,7 @@ pub enum Request {
HashSet(HashSet),
HashValues(HashValues),
HashIncrBy(HashIncrBy),
Incr(Incr),
ListIndex(ListIndex),
ListLen(ListLen),
ListPop(ListPop),
Expand Down Expand Up @@ -396,6 +401,7 @@ impl Request {
Self::HashSet(_) => "hset",
Self::HashValues(_) => "hvals",
Self::HashIncrBy(_) => "hincrby",
Self::Incr(_) => "incr",
Self::ListIndex(_) => "lindex",
Self::ListLen(_) => "llen",
Self::ListPop(_) => "lpop",
Expand Down Expand Up @@ -488,6 +494,12 @@ impl From<HashIncrBy> for Request {
}
}

impl From<Incr> for Request {
fn from(value: Incr) -> Self {
Self::Incr(value)
}
}

impl From<ListIndex> for Request {
fn from(value: ListIndex) -> Self {
Self::ListIndex(value)
Expand Down
1 change: 1 addition & 0 deletions src/protocol/resp/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ use crate::*;
pub trait Storage {
fn get(&mut self, request: &Get) -> Response;
fn set(&mut self, request: &Set) -> Response;
fn incr(&mut self, request: &Incr) -> Response;
}
10 changes: 9 additions & 1 deletion src/storage/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@ edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }

[[bench]]
name = "parsing"
path = "benches/benchmark.rs"
harness = false
[dependencies]


[dev-dependencies]
criterion = "0.3.4"

55 changes: 55 additions & 0 deletions src/storage/types/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Pelikan Foundation LLC.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use std::num::ParseIntError;
use storage_types::*;

use std::time::Duration;

fn parse_redis_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("parse_signed_redis");
group.measurement_time(Duration::from_secs(2));

let examples = vec![
("max_value", "9223372036854775807"),
("min_value", "-9223372036854775807"),
("average_value 6 bytes", "123456"),
("average_value 7 bytes", "1234567"),
("average_value 8 bytes", "12345678"),
("zero_value", "0"),
];

//passing cases
group.throughput(Throughput::Elements(1));
for (label, value) in examples {
let bytes = value.as_bytes();
group.bench_with_input(format!("parse i64: {}", label), &bytes, |b, &bytes| {
b.iter(|| {
let result = parse_signed_redis(bytes);
assert!(result.is_some());
})
});
}

let examples = vec![("overflowed_value", "92233720368547758079223372036854775807")];
//non-passing cases
group.throughput(Throughput::Elements(1));
for (label, value) in examples {
let bytes = value.as_bytes();
group.bench_with_input(
format!("parse (failed) i64: {}", label),
&bytes,
|b, &bytes| {
b.iter(|| {
let result = parse_signed_redis(bytes);
assert!(result.is_none());
})
},
);
}
}

criterion_group!(benches, parse_redis_benchmark);
criterion_main!(benches);
3 changes: 3 additions & 0 deletions src/storage/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

mod parse;
pub use parse::*;

#[derive(PartialEq, Eq)]
pub enum Value<'a> {
Bytes(&'a [u8]),
Expand Down
Loading