Skip to content

Commit

Permalink
feat: add client list capabilities
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <[email protected]>
  • Loading branch information
Miaxos committed Jan 29, 2024
1 parent ef55c28 commit 4807117
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 11 deletions.
23 changes: 18 additions & 5 deletions app/roster/src/application/server/cmd/client/list.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::bail;
use bytestring::ByteString;

use super::super::parse::Parse;
use crate::application::server::cmd::parse::ParseError;
Expand Down Expand Up @@ -99,14 +100,26 @@ impl ClientList {
dst: &mut WriteConnection,
ctx: Context,
) -> anyhow::Result<()> {
/*
let id = ctx.connection.id();
let connections = ctx.supervisor.get_normal_connection().await;

let response = Frame::Integer(id);
// TODO(@miaxos): lot of things missing here
let conn_frames = connections
.into_iter()
.map(|x| {
ByteString::from(format!(
"id={id} addr={addr} laddr={laddr} fd={fd}",
id = &x.id,
addr = &x.addr,
laddr = &x.laddr,
fd = &x.fd
))
})
.map(Frame::Simple)
.collect();

let response = Frame::Array(conn_frames);
dst.write_frame(&response).await?;
Ok(())
*/
unimplemented!()
}
}

Expand Down
14 changes: 11 additions & 3 deletions app/roster/src/application/server/server_thread.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::os::fd::AsRawFd;
use std::rc::Rc;
use std::thread::JoinHandle;

Expand Down Expand Up @@ -79,16 +80,21 @@ impl ServerMonoThreadedHandle {
let shard = shard.clone();

// We accept the TCP Connection
let (conn, _addr) = listener
let (conn, addr) = listener
.accept()
.await
.expect("Unable to accept connections");

let meta_conn = self.supervisor.assign_new_connection();
let laddr = conn.local_addr()?;
let fd = conn.as_raw_fd();

let meta_conn =
self.supervisor.assign_new_connection(addr, laddr, fd);
let supervisor = self.supervisor.clone();

conn.set_nodelay(true).unwrap();
let ctx = Context::new(storage, supervisor, meta_conn);
let ctx =
Context::new(storage, supervisor, meta_conn.clone());

// We map it to an `Handler` which is able to understand
// the Redis protocol
Expand All @@ -108,6 +114,8 @@ impl ServerMonoThreadedHandle {
// error!(?err);
panic!("blbl");
}

meta_conn.stop();
// handler.connection.stop().await.unwrap();
});
}
Expand Down
60 changes: 57 additions & 3 deletions app/roster/src/application/server/supervisor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::atomic::AtomicU64;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;

use scc::HashMap;
Expand Down Expand Up @@ -37,23 +38,76 @@ impl Supervisor {

/// Assign a new connection to the [Supervisor] and return a
/// [MetadataConnection]
pub fn assign_new_connection(&self) -> Arc<MetadataConnection> {
pub fn assign_new_connection(
&self,
addr: SocketAddr,
laddr: SocketAddr,
fd: i32,
) -> Arc<MetadataConnection> {
let id = self
.current_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let conn = Arc::new(MetadataConnection { id });
let conn = Arc::new(MetadataConnection {
id,
kind: MetadataConnectionKind::NORMAL,
stopped: AtomicBool::new(false),
addr,
laddr,
fd,
});
self.current_connections
.insert(id, conn.clone())
.expect("Can't fail");
conn
}

/// Get the list of [MetadataConnection] for normal connection.
pub async fn get_normal_connection(&self) -> Vec<Arc<MetadataConnection>> {
let mut result = Vec::new();

self.current_connections
.scan_async(|_, conn| {
let stopped =
conn.stopped.load(std::sync::atomic::Ordering::Relaxed);

if !stopped && conn.kind == MetadataConnectionKind::NORMAL {
result.push(conn.clone());
}
})
.await;

result
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum MetadataConnectionKind {
NORMAL,
}

/// [MetadataConnection] is where we store metadata about a Connection.
#[derive(Debug)]
pub struct MetadataConnection {
/// The associated ID for the connection
pub id: u64,
/// Describe the connection kind
pub kind: MetadataConnectionKind,
/// Tell if the connection is stopped
pub stopped: AtomicBool,
/// Address/Port of the client
pub addr: SocketAddr,
/// address/port of local address client connected to (bind address)
pub laddr: SocketAddr,
/// file descriptor corresponding to the socket
pub fd: i32,
}

impl MetadataConnection {
/// Indicate this connection is stopped.
pub fn stop(&self) {
self.stopped
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}

impl MetadataConnection {
Expand Down
2 changes: 2 additions & 0 deletions docs/cmd_list.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
- [ ] CLIENT INFO
- [ ] CLIENT KILL
- [ ] CLIENT LIST
A lot is missing right now but it's partially working, not every arguments are
supported yet.
- [ ] CLIENT NO EVICT
- [ ] CLIENT NO TOUCH
- [ ] CLIENT PAUSE
Expand Down

0 comments on commit 4807117

Please sign in to comment.