Skip to content

Commit

Permalink
misc: clippy
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <[email protected]>
  • Loading branch information
Miaxos committed Jan 7, 2024
1 parent e0c6a58 commit c4fecbf
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 79 deletions.
4 changes: 3 additions & 1 deletion app/roster/src/application/server/cmd/client/set_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::application::server::frame::Frame;
/// Note that these attributes are not cleared by the RESET command.
#[derive(Debug, Default)]
pub struct ClientSetInfo {
#[allow(dead_code)]
lib_name: Option<Bytes>,
#[allow(dead_code)]
lib_version: Option<Bytes>,
}

Expand Down Expand Up @@ -98,7 +100,7 @@ impl ClientSetInfo {
pub(crate) async fn apply(
self,
dst: &mut WriteConnection,
ctx: Context,
_ctx: Context,
) -> anyhow::Result<()> {
monoio::time::sleep(Duration::from_secs(3)).await;
/*
Expand Down
1 change: 0 additions & 1 deletion app/roster/src/application/server/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytestring::ByteString;
use tracing::info;

use super::parse::Parse;
use super::CommandExecution;
Expand Down
4 changes: 2 additions & 2 deletions app/roster/src/application/server/cmd/parse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{str, vec};

use bytes::{Buf, Bytes};
use bytes::Bytes;
use bytestring::ByteString;

use crate::application::server::frame::Frame;
Expand Down Expand Up @@ -94,7 +94,7 @@ impl Parse {
//
// Although errors are stored as strings and could be represented as
// raw bytes, they are considered separate types.
Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())),
Frame::Simple(s) => Ok(s.into_bytes()),
Frame::Bulk(data) => Ok(data),
frame => Err(format!(
"protocol error; expected simple frame or bulk frame, got {:?}",
Expand Down
3 changes: 1 addition & 2 deletions app/roster/src/application/server/cmd/ping.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use bytes::Bytes;
use bytestring::ByteString;
use tracing::info;

use super::parse::{Parse, ParseError};
use super::CommandExecution;
Expand Down Expand Up @@ -58,7 +57,7 @@ impl CommandExecution for Ping {
async fn apply(
self,
dst: &mut WriteConnection,
ctx: Context,
_ctx: Context,
) -> anyhow::Result<()> {
let response = match self.msg {
None => Frame::Simple(ByteString::from_static("PONG")),
Expand Down
14 changes: 3 additions & 11 deletions app/roster/src/application/server/cmd/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use std::time::Duration;

use bytes::Bytes;
use bytestring::ByteString;
use monoio::time::Instant;
use tracing::info;

use super::parse::{Parse, ParseError};
use super::CommandExecution;
Expand Down Expand Up @@ -53,7 +51,7 @@ pub struct Set {
expire: Option<Duration>,
}

const OK_STR: ByteString = ByteString::from_static("OK");
static OK_STR: ByteString = ByteString::from_static("OK");

impl Set {
/// Parse a `Set` instance from a received frame.
Expand Down Expand Up @@ -125,21 +123,15 @@ impl CommandExecution for Set {
dst: &mut WriteConnection,
ctx: Context,
) -> anyhow::Result<()> {
/*
let expired = match self.expire {
Some(dur) => Some(ctx.now() + dur.into()),
None => None,
};
*/
let expired = None;
let expired = self.expire.map(|dur| ctx.now() + dur.into());

// let now = Instant::now();
let response = match ctx
.storage
.set_async(self.key, self.value, SetOptions { expired })
.await
{
Ok(_) => Frame::Simple(OK_STR),
Ok(_) => Frame::Simple(OK_STR.clone()),
Err(_) => Frame::Null,
};
// let elapsed = now.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion app/roster/src/application/server/cmd/unknown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl CommandExecution for Unknown {
async fn apply(
self,
dst: &mut WriteConnection,
ctx: Context,
_ctx: Context,
) -> anyhow::Result<()> {
let response = Frame::Error(ByteString::from(format!(
"ERR unknown command '{}'",
Expand Down
4 changes: 2 additions & 2 deletions app/roster/src/application/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use monoio::io::{
};
use monoio::net::TcpStream;

use super::frame::{self, Frame};
use super::frame::Frame;

/// Send and receive `Frame` values from a remote peer.
///
Expand Down Expand Up @@ -205,7 +205,7 @@ impl WriteConnection {
}
Frame::Integer(val) => {
self.stream_w.write(&[b':']).await.0?;
self.write_decimal(*val as u64).await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream_w.write(b"$-1\r\n").await.0?;
Expand Down
2 changes: 0 additions & 2 deletions app/roster/src/application/server/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::cell::Cell;
use std::rc::Rc;
use std::sync::Arc;

use coarsetime::Instant;

Expand Down
14 changes: 5 additions & 9 deletions app/roster/src/application/server/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
//! parsing frames from a byte array.

use std::convert::TryInto;
use std::fmt;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;

use bytes::{Buf, Bytes, BytesMut};
use bytestring::ByteString;
use monoio::buf::{IoBuf, Slice};

/// A frame in the Redis protocol.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -103,7 +101,7 @@ impl Frame {
if b'-' == peek_u8(src)? {
let line = get_line(src)?;

if &line != b"-1".as_slice() {
if line != b"-1".as_slice() {
return Err(
"protocol error; invalid frame format".into()
);
Expand Down Expand Up @@ -220,7 +218,7 @@ fn get_decimal_mut(src: &mut Cursor<&BytesMut>) -> Result<u64, Error> {

/// Find a line
#[inline]
fn get_line<'a>(src: &mut Cursor<Bytes>) -> Result<Bytes, Error> {
fn get_line(src: &mut Cursor<Bytes>) -> Result<Bytes, Error> {
// Scan the bytes directly
let start = src.position() as usize;
// Scan to the second to last byte
Expand All @@ -240,9 +238,7 @@ fn get_line<'a>(src: &mut Cursor<Bytes>) -> Result<Bytes, Error> {
}

#[inline]
fn get_line_mut_no_return<'a>(
src: &mut Cursor<&BytesMut>,
) -> Result<(), Error> {
fn get_line_mut_no_return(src: &mut Cursor<&BytesMut>) -> Result<(), Error> {
// Scan the bytes directly
let start = src.position() as usize;
// Scan to the second to last byte
Expand All @@ -260,8 +256,8 @@ fn get_line_mut_no_return<'a>(
}

#[inline]
fn get_line_mut<'a>(
src: &'a mut Cursor<&BytesMut>,
fn get_line_mut(
src: &mut Cursor<&BytesMut>,
) -> Result<std::ops::Range<usize>, Error> {
// Scan the bytes directly
let start = src.position() as usize;
Expand Down
4 changes: 2 additions & 2 deletions app/roster/src/application/server/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ impl Handler {

monoio::select! {
r = accepting_frames_handle => {
return r;
r
}
r = answer_in_order_handle => {
return r;
r
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions app/roster/src/application/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
//! The whole redis server implementation is here.
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::rc::Rc;
use std::sync::atomic::AtomicU16;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;

use derive_builder::Builder;
use monoio::net::{ListenerConfig, TcpListener};
use thread_priority::{set_current_thread_priority, ThreadPriorityValue};
use tracing::{error, info};

mod connection;
mod context;
Expand All @@ -28,6 +23,7 @@ use crate::domain;
#[builder(pattern = "owned", setter(into, strip_option))]
pub struct ServerConfig {
bind_addr: SocketAddr,
#[allow(dead_code)]
connections_limit: Arc<AtomicU16>,
}

Expand Down Expand Up @@ -94,7 +90,7 @@ impl ServerConfig {
let (connection, r) =
WriteConnection::new(conn, 4 * 1024);

let mut handler = Handler {
let handler = Handler {
connection,
connection_r: r,
};
Expand Down
42 changes: 3 additions & 39 deletions app/roster/src/domain/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

use std::hash::BuildHasherDefault;
use std::rc::Rc;
use std::sync::atomic::{AtomicU16, AtomicU32};
use std::time::SystemTime;
use std::sync::atomic::AtomicU32;

use bytes::Bytes;
use bytestring::ByteString;
use coarsetime::Instant;
use rustc_hash::{FxHashMap, FxHasher};
use rustc_hash::FxHasher;
use scc::HashMap;

// We disallow Send just to be sure
Expand All @@ -23,7 +22,7 @@ pub struct StorageValue {
/// Storage
#[derive(Default, Debug, Clone)]
pub struct Storage {
db: Rc<HashMap<ByteString, StorageValue>>,
db: Rc<HashMap<ByteString, StorageValue, BuildHasherDefault<FxHasher>>>,
count: Rc<AtomicU32>,
}

Expand All @@ -43,8 +42,6 @@ impl Storage {
4096,
Default::default(),
)),
// db: HashMap<ByteString, StorageValue,
// BuildHasherDefault<FxHasher>>,
count: Rc::new(AtomicU32::new(0)),
}
}
Expand Down Expand Up @@ -86,39 +83,6 @@ impl Storage {
}
}

/// Set a key
pub fn set(
&self,
key: ByteString,
val: Bytes,
opt: SetOptions,
) -> Result<Option<StorageValue>, (String, StorageValue)> {
let val = StorageValue {
expired: opt.expired,
val,
};

let old = self
.count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Simulate some eviction mechanisme when we have too many keys
if old > 400_000 {
// dbg!("remove");
// TODO: If the RC is for the DB instead, we could have a spawn from
// monoio for this task instead, it would save us some
// time for the p99.9
self.db.retain(|_, _| false);
self.count.swap(0, std::sync::atomic::Ordering::Relaxed);
}

if let Err((key, val)) = self.db.insert(key, val) {
let old = self.db.update(&key, |_, _| val);
Ok(old)
} else {
Ok(None)
}
}

/// Get a key
///
/// Return None if it doesn't exist
Expand Down
1 change: 1 addition & 0 deletions app/roster/src/infrastructure/instruments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tracing_subscriber::{self};
#[must_use]
pub struct Instruments {}

#[allow(dead_code)]
impl Instruments {
/// Create a new `Instruments` stack and register it globally.
pub fn new() -> anyhow::Result<Self> {
Expand Down
2 changes: 1 addition & 1 deletion app/roster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;

use application::server::ServerConfigBuilder;
use infrastructure::config::Cfg;
use infrastructure::instruments::Instruments;
// use infrastructure::instruments::Instruments;

fn main() -> anyhow::Result<()> {
// Initialize config
Expand Down

0 comments on commit c4fecbf

Please sign in to comment.