diff --git a/app/roster/src/application/server/cmd/client/list.rs b/app/roster/src/application/server/cmd/client/list.rs index 2e761b3..2d3c532 100644 --- a/app/roster/src/application/server/cmd/client/list.rs +++ b/app/roster/src/application/server/cmd/client/list.rs @@ -1,4 +1,5 @@ use anyhow::bail; +use bytestring::ByteString; use super::super::parse::Parse; use crate::application::server::cmd::parse::ParseError; @@ -99,14 +100,26 @@ impl ClientList { dst: &mut WriteConnection, ctx: Context, ) -> anyhow::Result<()> { - /* - let id = ctx.connection.id(); + let connections = ctx.supervisor.get_normal_connection().await; - let response = Frame::Integer(id); + // TODO(@miaxos): lot of things missing here + let conn_frames = connections + .into_iter() + .map(|x| { + ByteString::from(format!( + "id={id} addr={addr} laddr={laddr} fd={fd}", + id = &x.id, + addr = &x.addr, + laddr = &x.laddr, + fd = &x.fd + )) + }) + .map(Frame::Simple) + .collect(); + + let response = Frame::Array(conn_frames); dst.write_frame(&response).await?; Ok(()) - */ - unimplemented!() } } diff --git a/app/roster/src/application/server/server_thread.rs b/app/roster/src/application/server/server_thread.rs index 064b31b..4e184d7 100644 --- a/app/roster/src/application/server/server_thread.rs +++ b/app/roster/src/application/server/server_thread.rs @@ -1,3 +1,4 @@ +use std::os::fd::AsRawFd; use std::rc::Rc; use std::thread::JoinHandle; @@ -79,16 +80,21 @@ impl ServerMonoThreadedHandle { let shard = shard.clone(); // We accept the TCP Connection - let (conn, _addr) = listener + let (conn, addr) = listener .accept() .await .expect("Unable to accept connections"); - let meta_conn = self.supervisor.assign_new_connection(); + let laddr = conn.local_addr()?; + let fd = conn.as_raw_fd(); + + let meta_conn = + self.supervisor.assign_new_connection(addr, laddr, fd); let supervisor = self.supervisor.clone(); conn.set_nodelay(true).unwrap(); - let ctx = Context::new(storage, supervisor, meta_conn); + let ctx = + Context::new(storage, supervisor, meta_conn.clone()); // We map it to an `Handler` which is able to understand // the Redis protocol @@ -108,6 +114,8 @@ impl ServerMonoThreadedHandle { // error!(?err); panic!("blbl"); } + + meta_conn.stop(); // handler.connection.stop().await.unwrap(); }); } diff --git a/app/roster/src/application/server/supervisor/mod.rs b/app/roster/src/application/server/supervisor/mod.rs index fbd6f33..ee77a3d 100644 --- a/app/roster/src/application/server/supervisor/mod.rs +++ b/app/roster/src/application/server/supervisor/mod.rs @@ -1,4 +1,5 @@ -use std::sync::atomic::AtomicU64; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::Arc; use scc::HashMap; @@ -37,16 +38,51 @@ impl Supervisor { /// Assign a new connection to the [Supervisor] and return a /// [MetadataConnection] - pub fn assign_new_connection(&self) -> Arc { + pub fn assign_new_connection( + &self, + addr: SocketAddr, + laddr: SocketAddr, + fd: i32, + ) -> Arc { let id = self .current_id .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let conn = Arc::new(MetadataConnection { id }); + let conn = Arc::new(MetadataConnection { + id, + kind: MetadataConnectionKind::NORMAL, + stopped: AtomicBool::new(false), + addr, + laddr, + fd, + }); self.current_connections .insert(id, conn.clone()) .expect("Can't fail"); conn } + + /// Get the list of [MetadataConnection] for normal connection. + pub async fn get_normal_connection(&self) -> Vec> { + let mut result = Vec::new(); + + self.current_connections + .scan_async(|_, conn| { + let stopped = + conn.stopped.load(std::sync::atomic::Ordering::Relaxed); + + if !stopped && conn.kind == MetadataConnectionKind::NORMAL { + result.push(conn.clone()); + } + }) + .await; + + result + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum MetadataConnectionKind { + NORMAL, } /// [MetadataConnection] is where we store metadata about a Connection. @@ -54,6 +90,24 @@ impl Supervisor { pub struct MetadataConnection { /// The associated ID for the connection pub id: u64, + /// Describe the connection kind + pub kind: MetadataConnectionKind, + /// Tell if the connection is stopped + pub stopped: AtomicBool, + /// Address/Port of the client + pub addr: SocketAddr, + /// address/port of local address client connected to (bind address) + pub laddr: SocketAddr, + /// file descriptor corresponding to the socket + pub fd: i32, +} + +impl MetadataConnection { + /// Indicate this connection is stopped. + pub fn stop(&self) { + self.stopped + .store(true, std::sync::atomic::Ordering::Relaxed); + } } impl MetadataConnection { diff --git a/docs/cmd_list.md b/docs/cmd_list.md index 59b2da5..b54890f 100644 --- a/docs/cmd_list.md +++ b/docs/cmd_list.md @@ -40,6 +40,8 @@ - [ ] CLIENT INFO - [ ] CLIENT KILL - [ ] CLIENT LIST + A lot is missing right now but it's partially working, not every arguments are + supported yet. - [ ] CLIENT NO EVICT - [ ] CLIENT NO TOUCH - [ ] CLIENT PAUSE