From c604697352da62125da19eb72234338216d1baa0 Mon Sep 17 00:00:00 2001 From: fluxie Date: Sat, 23 Dec 2023 10:23:46 +0200 Subject: [PATCH 1/6] Added area for the client callstack to the UI The area becomes visible if the client included both the process id and the thread id of the calling thread in the headers of a request. Support for capturing and displaying the callstack will be added in another commit. --- src/ui/sub_views/details_pane.rs | 38 +++++++++- src/ui/views.rs | 3 + src/ui/views/callstack_view.rs | 125 +++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 src/ui/views/callstack_view.rs diff --git a/src/ui/sub_views/details_pane.rs b/src/ui/sub_views/details_pane.rs index 9827382..45bc0d7 100644 --- a/src/ui/sub_views/details_pane.rs +++ b/src/ui/sub_views/details_pane.rs @@ -1,3 +1,4 @@ +use std::convert::TryFrom; use tui::layout::{Constraint, Direction, Layout}; use tui::text::{Span, Spans, Text}; use tui::widgets::Paragraph; @@ -6,7 +7,7 @@ use uuid::Uuid; use crate::ui::prelude::*; use crate::session::{EncodedRequest, RequestPart}; -use crate::ui::views::MessageView; +use crate::ui::views::{CallstackView, ClientThreadId, MessageView}; #[derive(Clone, Default)] pub struct DetailsPane; @@ -22,6 +23,7 @@ impl DetailsPane match key.code { KeyCode::Char('q') => self.create_message_view(req, RequestPart::Request), KeyCode::Char('e') => self.create_message_view(req, RequestPart::Response), + KeyCode::Char('s') => self.create_callstack_view(req), _ => None, } } else { @@ -61,11 +63,20 @@ impl DetailsPane c.x -= 1; c.width += 2; c.height += 1; + let vertical_chunks: Vec = if ClientThreadId::try_from(&request.request_msg).is_ok() { + Layout::default() + .direction(Direction::Vertical) + .margin(0) + .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) + .split(block.inner(c)) + } else { + Vec::from([block.inner(c)]) + }; let req_resp_chunks = Layout::default() .direction(Direction::Horizontal) .margin(0) .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref()) - .split(block.inner(c)); + .split(vertical_chunks[0]); f.render_widget(block, chunk); @@ -114,6 +125,16 @@ impl DetailsPane offset: 0, } .draw(ctx, f, req_resp_chunks[1]); + + // The right side view is split vertically only if the client included its process id and thread id in the request + // enabling the callstack capture. + if vertical_chunks.len() > 1 { + CallstackView { + request: request.request_data.uuid, + offset: 0, + } + .draw(ctx, f, vertical_chunks[1]); + } } fn create_message_view( @@ -128,4 +149,17 @@ impl DetailsPane offset: 0, }))) } + + fn create_callstack_view(&mut self, req: &EncodedRequest) + -> Option> + { + if ClientThreadId::try_from(&req.request_msg).is_ok() { + Some(HandleResult::PushView(Box::new(CallstackView { + request: req.request_data.uuid, + offset: 0, + }))) + } else { + None + } + } } diff --git a/src/ui/views.rs b/src/ui/views.rs index b319503..f2c76c2 100644 --- a/src/ui/views.rs +++ b/src/ui/views.rs @@ -6,6 +6,9 @@ pub use main_view::MainView; mod message_view; pub use message_view::MessageView; +mod callstack_view; +pub use callstack_view::{CallstackView, ClientThreadId}; + pub trait View { fn draw(&mut self, ctx: &UiContext, f: &mut Frame, chunk: Rect); diff --git a/src/ui/views/callstack_view.rs b/src/ui/views/callstack_view.rs new file mode 100644 index 0000000..056c03d --- /dev/null +++ b/src/ui/views/callstack_view.rs @@ -0,0 +1,125 @@ +use super::prelude::*; +use crossterm::event::KeyCode; +use http::HeaderValue; +use std::convert::TryFrom; +use tui::widgets::{Paragraph, Wrap}; +use uuid::Uuid; + +use crate::session::MessageData; + +/// When available, identifies the thread in the calling or client process. +/// The client should reports its process id with the proxide-client-process-id" header and +/// the thread id with the "proxide-client-thread-id" header. +/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide +/// and the client are running on the same host. +pub struct ClientThreadId +{ + process_id: u32, + thread_id: i64, +} + +pub struct CallstackView +{ + pub request: Uuid, + pub offset: u16, +} + +impl CallstackView {} + +impl View for CallstackView +{ + fn draw(&mut self, ctx: &UiContext, f: &mut Frame, chunk: Rect) + { + let request = match ctx.data.requests.get_by_uuid(self.request) { + Some(r) => r, + None => return, + }; + + let client_thread = match ClientThreadId::try_from(&request.request_msg) { + Ok(thread_id) => thread_id, + Err(_) => return, + }; + + let title = format!( + "Client call[s]tack, Process: {}, Thread: {}", + client_thread.process_id, client_thread.thread_id + ); + let block = create_block(&title); + let request_data = Paragraph::new("Unimplemented.") + .block(block) + .wrap(Wrap { trim: false }) + .scroll((self.offset, 0)); + f.render_widget(request_data, chunk); + } + + fn on_input(&mut self, _ctx: &UiContext, e: &CTEvent, size: Rect) -> Option> + { + match e { + CTEvent::Key(key) => match key.code { + KeyCode::Char('k') | KeyCode::Up => self.offset = self.offset.saturating_sub(1), + KeyCode::Char('j') | KeyCode::Down => self.offset = self.offset.saturating_add(1), + KeyCode::PageDown => self.offset = self.offset.saturating_add(size.height - 5), + KeyCode::PageUp => self.offset = self.offset.saturating_sub(size.height - 5), + KeyCode::F(12) => { + return None; + } + _ => return None, + }, + _ => return None, + }; + Some(HandleResult::Update) + } + + fn on_change(&mut self, _ctx: &UiContext, change: &SessionChange) -> bool + { + match change { + SessionChange::NewConnection { .. } => false, + SessionChange::Connection { .. } => false, + SessionChange::NewRequest { .. } => false, + SessionChange::Request { .. } => false, + SessionChange::NewMessage { .. } => false, + SessionChange::Message { .. } => false, + } + } + + fn help_text(&self, _state: &UiContext, _size: Rect) -> String + { + format!( + "{}\n{}", + "[Up/Down, j/k, PgUp/PgDn]: Scroll; [F12]: Export to file", "[Esc]: Back to main view" + ) + } +} + +impl TryFrom<&MessageData> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &MessageData) -> Result + { + let process_id: Option = + number_or_none(&value.headers.get("proxide-client-process-id")); + let thread_id: Option = number_or_none(&value.headers.get("proxide-client-thread-id")); + match (process_id, thread_id) { + (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { + process_id, + thread_id, + }), + _ => Err(()), + } + } +} + +fn number_or_none(header: &Option<&HeaderValue>) -> Option +where + N: std::str::FromStr, +{ + if let Some(value) = header { + value + .to_str() + .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) + .unwrap_or(None) + } else { + None + } +} From 77759c8ffa6171dcf686b78f86344eebaf27f4fb Mon Sep 17 00:00:00 2001 From: fluxie Date: Sat, 23 Dec 2023 15:41:04 +0200 Subject: [PATCH 2/6] Converted unwrap to expect Proxide panics when launched from RustRover. I tried to locate the source of the panic. Though it always seemed to come from the internals of the Rust main runtime. Launching the application in debug mode does not trigger the panic. The application does not panic if a pause of 500 milliseconds is added before the "let chunk = f.size();" gets executed in marcos.rs:draw_views function. The pause can be anywhere between the first line in main() and the function mentioned above. unwrap -> expect conversions were done while hunting for this issue. --- src/ui.rs | 3 +-- src/ui/state.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ui.rs b/src/ui.rs index 4023de5..9f85a50 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -92,8 +92,7 @@ pub fn main( state.draw(&mut terminal).context(IoError {})?; let mut redraw_pending = false; loop { - let e = ui_rx.recv().unwrap(); - + let e = ui_rx.recv().expect( "Receiving UI events failed."); if let UiEvent::Redraw = e { redraw_pending = false; state.draw(&mut terminal).context(IoError {})?; diff --git a/src/ui/state.rs b/src/ui/state.rs index 92248a9..fab4eaa 100644 --- a/src/ui/state.rs +++ b/src/ui/state.rs @@ -257,7 +257,7 @@ impl ProxideUi let help_text = if let Some(cmd) = &self.input_command { format!("{}\n{}{}", cmd.help, cmd.prompt, cmd.input) } else { - let view = self.ui_stack.last_mut().unwrap(); + let view = self.ui_stack.last_mut().expect("Empty UI stack."); view.help_text(&self.context, self.context.size) }; From 7e3d3587f7fbd01ea0a7a136fd92e76ed313cc0d Mon Sep 17 00:00:00 2001 From: fluxie Date: Sat, 23 Dec 2023 17:00:11 +0200 Subject: [PATCH 3/6] Implemented callstack capture task Currently the task in htt2p.rs only reports that capturing the callstack is not supported on any operating system. But now with the asynchronously executed capture task it will be possible to actually implement the capturing. --- src/connection.rs | 68 ++++++++++++++++++++++++++ src/connection/http2.rs | 82 ++++++++++++++++++++++++++++++++ src/main.rs | 64 +++++++++++++++++++++++++ src/session.rs | 8 ++++ src/session/events.rs | 28 +++++++++++ src/ui.rs | 2 +- src/ui/sub_views/details_pane.rs | 23 ++++----- src/ui/views.rs | 2 +- src/ui/views/callstack_view.rs | 63 +++++------------------- src/ui/views/main_view.rs | 1 + src/ui/views/message_view.rs | 1 + 11 files changed, 279 insertions(+), 63 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 443a052..3003e62 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,4 +1,6 @@ +use http::{HeaderMap, HeaderValue}; use snafu::{ResultExt, Snafu}; +use std::convert::TryFrom; use std::net::SocketAddr; use std::sync::mpsc::Sender; use std::sync::Arc; @@ -124,6 +126,58 @@ impl Streams } } +/// When available, identifies the thread in the calling or client process. +/// The client should reports its process id with the proxide-client-process-id" header and +/// the thread id with the "proxide-client-thread-id" header. +/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide +/// and the client are running on the same host. +pub struct ClientThreadId +{ + process_id: u32, + thread_id: i64, +} + +impl ClientThreadId +{ + pub fn process_id(&self) -> u32 + { + self.process_id + } + + pub fn thread_id(&self) -> i64 + { + self.thread_id + } +} + +impl TryFrom<&MessageData> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &MessageData) -> std::result::Result + { + ClientThreadId::try_from(&value.headers) + } +} + +impl TryFrom<&HeaderMap> for ClientThreadId +{ + type Error = (); + + fn try_from(value: &HeaderMap) -> std::result::Result + { + let process_id: Option = number_or_none(&value.get("proxide-client-process-id")); + let thread_id: Option = number_or_none(&value.get("proxide-client-thread-id")); + match (process_id, thread_id) { + (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { + process_id, + thread_id, + }), + _ => Err(()), + } + } +} + /// Handles a single client connection. /// /// The connection handling is split into multiple functions, but the functions are chained in a @@ -311,3 +365,17 @@ where log::info!("Exit"); }); } + +fn number_or_none(header: &Option<&HeaderValue>) -> Option +where + N: std::str::FromStr, +{ + if let Some(value) = header { + value + .to_str() + .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) + .unwrap_or(None) + } else { + None + } +} diff --git a/src/connection/http2.rs b/src/connection/http2.rs index 512b85b..c5a1c5f 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -8,10 +8,14 @@ use h2::{ use http::{HeaderMap, Request, Response}; use log::error; use snafu::ResultExt; +use std::convert::TryFrom; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::mpsc::Sender; +use std::task::{Context, Poll}; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; use super::*; @@ -141,6 +145,12 @@ pub struct ProxyRequest client_response: SendResponse, server_request: SendStream, server_response: ResponseFuture, + request_processor: ProcessingFuture, +} + +struct ProcessingFuture +{ + inner: JoinHandle<()>, } impl ProxyRequest @@ -191,6 +201,10 @@ impl ProxyRequest })) .unwrap(); + // Request processor supports asynchronous message processing while the proxide is busy proxying data between + // the client and the server. + let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui); + let server_request = Request::from_parts(client_head, ()); // Set up a server request. @@ -208,6 +222,7 @@ impl ProxyRequest client_response, server_request, server_response, + request_processor, }) } @@ -265,6 +280,7 @@ impl ProxyRequest let mut client_response = self.client_response; let server_response = self.server_response; let connection_uuid = self.connection_uuid; + let request_processor = self.request_processor; let ui_temp = ui.clone(); let response_future = async move { let ui = ui_temp; @@ -293,6 +309,11 @@ impl ProxyRequest scenario: "sending response", })?; + // Ensure the request processor has finished before we send the response to the client. + // Callstack capturing process inside the request processor may capture incorrect data if + // the client is given the final answer from the server as it no longer has to wait for the response. + request_processor.await; + // The server might have sent all the details in the headers, at which point there is // no body present. Check for this scenario here. if response_body.is_end_stream() { @@ -440,3 +461,64 @@ fn is_fatal_error(r: &Result) -> bool }, } } + +impl ProcessingFuture +{ + fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender) -> Self + { + let mut tasks: JoinSet>> = + JoinSet::new(); + + // Task which attempts to capture client's callstack. + if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) { + let ui_clone = ui.clone(); + tasks.spawn(ProcessingFuture::capture_client_callstack( + uuid, thread_id, ui_clone, + )); + } + + Self { + inner: tokio::spawn(async move { + while let Some(result) = tasks.join_next().await { + match result { + Ok(_) => {} + Err(e) => { + // TODO: Send the error to UI. + eprintln!("{}", e); + error!("{}", e); + } + } + } + }), + } + } + + async fn capture_client_callstack( + uuid: Uuid, + _client_thread_id: ClientThreadId, + ui: Sender, + ) -> std::result::Result<(), Box> + { + // TODO: Try to capture the callstack + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Unsupported, + }, + ))?; + Ok(()) + } +} + +impl Future for ProcessingFuture +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll + { + match Pin::new(&mut self.inner).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/main.rs b/src/main.rs index 5f37324..c373346 100644 --- a/src/main.rs +++ b/src/main.rs @@ -419,6 +419,7 @@ mod test use log::SetLoggerError; use serial_test::serial; use std::io::{ErrorKind, Write}; + use std::ops::Add; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -426,6 +427,7 @@ mod test use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::oneshot; + use tokio::time::Instant; use crate::session::events::SessionEvent; use crate::ConnectionOptions; @@ -533,6 +535,68 @@ mod test .expect("Waiting for proxide to stop failed."); } + #[tokio::test] + #[serial] + async fn proxide_receives_client_callstack_ui_message() + { + // Logging must be enabled to detect errors inside proxide. + // Failure to monitor logs may cause the test to hang as errors that stop processing get silently ignored. + let mut error_monitor = get_error_monitor().expect("Acquiring error monitor failed."); + + // Server + let server = GrpcServer::start() + .await + .expect("Starting test server failed."); + + // Proxide + let options = get_proxide_options(&server); + let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>(); + let (ui_tx, ui_rx_std) = std::sync::mpsc::channel(); + let proxide_port = u16::from_str(&options.listen_port.to_string()).unwrap(); + let proxide = tokio::spawn(crate::launch_proxide(options, abort_rx, ui_tx)); + + // Message generator and tester. + let tester = grpc_tester::GrpcTester::with_proxide( + server, + proxide_port, + grpc_tester::Args { + period: std::time::Duration::from_secs(0), + tasks: 1, + }, + ) + .await + .expect("Starting tester failed."); + let mut message_rx = async_from_sync(ui_rx_std); + + // UI channel should be constantly receiving client callstack events. + // The generator includes the process id and the thread id in the messages it sends. + let mut client_callstack_received = false; + let timeout_at = Instant::now().add(Duration::from_secs(30)); + while let Some(message) = tokio::select! { + result = message_rx.recv() => result, + _t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ), + error = error_monitor.recv() => panic!( "{:?}", error ), + } { + if let SessionEvent::ClientCallstackProcessed(..) = message { + client_callstack_received = true; + break; + } else if Instant::now() > timeout_at { + panic!("Timeout") + } + } + + // Ensure the ui channel was not closed prematurely. + assert!(client_callstack_received); + + let mut server = tester.stop_generator().expect("Stopping generator failed."); + abort_tx.send(()).expect("Stopping proxide failed."); + proxide + .await + .expect("Waiting for proxide to stop failed.") + .expect("Waiting for proxide to stop failed."); + server.stop().expect("Stopping server failed"); + } + /// Gets options for launching proxide. fn get_proxide_options(server: &GrpcServer) -> Arc { diff --git a/src/session.rs b/src/session.rs index 83f3ce1..971a2ed 100644 --- a/src/session.rs +++ b/src/session.rs @@ -56,6 +56,7 @@ pub struct RequestData pub start_timestamp: DateTime, pub end_timestamp: Option>, + pub client_callstack: Option, pub status: Status, } @@ -126,6 +127,13 @@ impl MessageData } } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum ClientCallstack +{ + /// Proxide does not support callstack capture on the current platform/operating system. + Unsupported, +} + impl IndexedVec { pub fn push(&mut self, uuid: Uuid, item: T) diff --git a/src/session/events.rs b/src/session/events.rs index 2334d04..7a77242 100644 --- a/src/session/events.rs +++ b/src/session/events.rs @@ -14,6 +14,7 @@ pub enum SessionEvent MessageDone(MessageDoneEvent), RequestDone(RequestDoneEvent), ConnectionDone(ConnectionDoneEvent), + ClientCallstackProcessed(ClientCallstackProcessedEvent), } #[derive(Serialize, Deserialize, Debug)] @@ -84,6 +85,13 @@ pub struct ConnectionDoneEvent pub timestamp: SystemTime, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ClientCallstackProcessedEvent +{ + pub uuid: Uuid, + pub callstack: ClientCallstack, +} + pub enum SessionChange { NewConnection @@ -110,6 +118,10 @@ pub enum SessionChange { connection: Uuid }, + Callstack + { + request: Uuid + }, } impl Session @@ -124,6 +136,7 @@ impl Session SessionEvent::MessageDone(e) => self.on_message_done(e), SessionEvent::RequestDone(e) => self.on_request_done(e), SessionEvent::ConnectionDone(e) => self.on_connection_done(e), + SessionEvent::ClientCallstackProcessed(e) => self.on_client_callstack_processed(e), } } @@ -154,6 +167,7 @@ impl Session status: Status::InProgress, start_timestamp: e.timestamp.into(), end_timestamp: None, + client_callstack: None, }, request_msg: MessageData::new(RequestPart::Request) .with_headers(e.headers) @@ -247,4 +261,18 @@ impl Session vec![] } } + + fn on_client_callstack_processed( + &mut self, + e: ClientCallstackProcessedEvent, + ) -> Vec + { + let request = self.requests.get_mut_by_uuid(e.uuid); + if let Some(request) = request { + request.request_data.client_callstack = Some(e.callstack); + vec![SessionChange::Callstack { request: e.uuid }] + } else { + vec![] + } + } } diff --git a/src/ui.rs b/src/ui.rs index 9f85a50..24ddd55 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -92,7 +92,7 @@ pub fn main( state.draw(&mut terminal).context(IoError {})?; let mut redraw_pending = false; loop { - let e = ui_rx.recv().expect( "Receiving UI events failed."); + let e = ui_rx.recv().expect("Receiving UI events failed."); if let UiEvent::Redraw = e { redraw_pending = false; state.draw(&mut terminal).context(IoError {})?; diff --git a/src/ui/sub_views/details_pane.rs b/src/ui/sub_views/details_pane.rs index 45bc0d7..d0c0a75 100644 --- a/src/ui/sub_views/details_pane.rs +++ b/src/ui/sub_views/details_pane.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::ui::prelude::*; use crate::session::{EncodedRequest, RequestPart}; -use crate::ui::views::{CallstackView, ClientThreadId, MessageView}; +use crate::ui::views::{CallstackView, MessageView}; #[derive(Clone, Default)] pub struct DetailsPane; @@ -63,15 +63,16 @@ impl DetailsPane c.x -= 1; c.width += 2; c.height += 1; - let vertical_chunks: Vec = if ClientThreadId::try_from(&request.request_msg).is_ok() { - Layout::default() - .direction(Direction::Vertical) - .margin(0) - .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) - .split(block.inner(c)) - } else { - Vec::from([block.inner(c)]) - }; + let vertical_chunks: Vec = + if crate::connection::ClientThreadId::try_from(&request.request_msg).is_ok() { + Layout::default() + .direction(Direction::Vertical) + .margin(0) + .constraints([Constraint::Percentage(80), Constraint::Percentage(20)].as_ref()) + .split(block.inner(c)) + } else { + Vec::from([block.inner(c)]) + }; let req_resp_chunks = Layout::default() .direction(Direction::Horizontal) .margin(0) @@ -153,7 +154,7 @@ impl DetailsPane fn create_callstack_view(&mut self, req: &EncodedRequest) -> Option> { - if ClientThreadId::try_from(&req.request_msg).is_ok() { + if crate::connection::ClientThreadId::try_from(&req.request_msg).is_ok() { Some(HandleResult::PushView(Box::new(CallstackView { request: req.request_data.uuid, offset: 0, diff --git a/src/ui/views.rs b/src/ui/views.rs index f2c76c2..5f88b49 100644 --- a/src/ui/views.rs +++ b/src/ui/views.rs @@ -7,7 +7,7 @@ mod message_view; pub use message_view::MessageView; mod callstack_view; -pub use callstack_view::{CallstackView, ClientThreadId}; +pub use callstack_view::CallstackView; pub trait View { diff --git a/src/ui/views/callstack_view.rs b/src/ui/views/callstack_view.rs index 056c03d..ed843a9 100644 --- a/src/ui/views/callstack_view.rs +++ b/src/ui/views/callstack_view.rs @@ -1,23 +1,10 @@ use super::prelude::*; +use crate::session::ClientCallstack; use crossterm::event::KeyCode; -use http::HeaderValue; use std::convert::TryFrom; use tui::widgets::{Paragraph, Wrap}; use uuid::Uuid; -use crate::session::MessageData; - -/// When available, identifies the thread in the calling or client process. -/// The client should reports its process id with the proxide-client-process-id" header and -/// the thread id with the "proxide-client-thread-id" header. -/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide -/// and the client are running on the same host. -pub struct ClientThreadId -{ - process_id: u32, - thread_id: i64, -} - pub struct CallstackView { pub request: Uuid, @@ -35,17 +22,25 @@ impl View for CallstackView None => return, }; - let client_thread = match ClientThreadId::try_from(&request.request_msg) { + let client_thread = match crate::connection::ClientThreadId::try_from(&request.request_msg) + { Ok(thread_id) => thread_id, Err(_) => return, }; let title = format!( "Client call[s]tack, Process: {}, Thread: {}", - client_thread.process_id, client_thread.thread_id + client_thread.process_id(), + client_thread.thread_id() ); + let message = match request.request_data.client_callstack { + Some(ClientCallstack::Unsupported) => { + "Callstack unavailable:\n* Unsupported operating system." + } + None => ".. (Pending)", + }; let block = create_block(&title); - let request_data = Paragraph::new("Unimplemented.") + let request_data = Paragraph::new(message) .block(block) .wrap(Wrap { trim: false }) .scroll((self.offset, 0)); @@ -79,6 +74,7 @@ impl View for CallstackView SessionChange::Request { .. } => false, SessionChange::NewMessage { .. } => false, SessionChange::Message { .. } => false, + SessionChange::Callstack { request } => *request == self.request, } } @@ -90,36 +86,3 @@ impl View for CallstackView ) } } - -impl TryFrom<&MessageData> for ClientThreadId -{ - type Error = (); - - fn try_from(value: &MessageData) -> Result - { - let process_id: Option = - number_or_none(&value.headers.get("proxide-client-process-id")); - let thread_id: Option = number_or_none(&value.headers.get("proxide-client-thread-id")); - match (process_id, thread_id) { - (Some(process_id), Some(thread_id)) => Ok(ClientThreadId { - process_id, - thread_id, - }), - _ => Err(()), - } - } -} - -fn number_or_none(header: &Option<&HeaderValue>) -> Option -where - N: std::str::FromStr, -{ - if let Some(value) = header { - value - .to_str() - .map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None)) - .unwrap_or(None) - } else { - None - } -} diff --git a/src/ui/views/main_view.rs b/src/ui/views/main_view.rs index 7539f89..870952b 100644 --- a/src/ui/views/main_view.rs +++ b/src/ui/views/main_view.rs @@ -132,6 +132,7 @@ impl View for MainView .selected(&ctx.data.requests) .map(|r| r.request_data.uuid == *req) .unwrap_or(false), + SessionChange::Callstack { .. } => false, } } diff --git a/src/ui/views/message_view.rs b/src/ui/views/message_view.rs index 62070f0..e528147 100644 --- a/src/ui/views/message_view.rs +++ b/src/ui/views/message_view.rs @@ -155,6 +155,7 @@ impl View for MessageView | SessionChange::Message { request, part } => { *part == self.part && *request == self.request } + SessionChange::Callstack { .. } => false, } } From 681b6313274c5d1f9550cbbf68aab9c39ebb411e Mon Sep 17 00:00:00 2001 From: fluxie Date: Mon, 15 Jan 2024 14:15:38 +0200 Subject: [PATCH 4/6] Implemented callstack capture flow for a unit test This adds parts of the infrastructure required to display the callstack in the ui. And adds a test to verify the basic capturing concept works. The implementation requires utilizing rstack_self crate to the capture which in turn required helper crates to wrap its functionality for ease of use. --- .github/workflows/ci.yml | 9 + .gitignore | 2 +- .idea/proxide.iml | 4 + Cargo.lock | 120 ++++++++++++ Cargo.toml | 7 + src/connection/http2.rs | 127 +++++++++++-- src/main.rs | 33 +++- src/session.rs | 17 ++ src/session/callstack.rs | 131 ++++++++++++++ src/ui/toast.rs | 4 +- src/ui/views/callstack_view.rs | 31 +++- test/rstack-child/Cargo.lock | 260 ++++++++++++++++++++++++++ test/rstack-child/Cargo.toml | 9 + test/rstack-child/src/main.rs | 13 ++ test/rstack-launcher/Cargo.lock | 312 ++++++++++++++++++++++++++++++++ test/rstack-launcher/Cargo.toml | 15 ++ test/rstack-launcher/build.rs | 36 ++++ test/rstack-launcher/src/lib.rs | 93 ++++++++++ 18 files changed, 1200 insertions(+), 23 deletions(-) create mode 100644 src/session/callstack.rs create mode 100644 test/rstack-child/Cargo.lock create mode 100644 test/rstack-child/Cargo.toml create mode 100644 test/rstack-child/src/main.rs create mode 100644 test/rstack-launcher/Cargo.lock create mode 100644 test/rstack-launcher/Cargo.toml create mode 100644 test/rstack-launcher/build.rs create mode 100644 test/rstack-launcher/src/lib.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f6f2300..ec6e13b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,11 @@ jobs: - name: Install Protoc uses: arduino/setup-protoc@v2 + - name: Install libunwind-dev + uses: ConorMacBride/install-package@v1 + with: + apt: libunwind-dev + - name: Environment run: | cargo --version @@ -69,6 +74,10 @@ jobs: - name: Install Protoc uses: arduino/setup-protoc@v2 + - name: Install libunwind-dev + run: | + sudo apt update && sudo apt install -y libunwind-dev + - name: Environment run: | cargo --version diff --git a/.gitignore b/.gitignore index 7d4d3e2..324ecc2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/target +**/target **/*.rs.bk .vs packages diff --git a/.idea/proxide.iml b/.idea/proxide.iml index 1651999..9828d78 100644 --- a/.idea/proxide.iml +++ b/.idea/proxide.iml @@ -4,8 +4,12 @@ + + + + diff --git a/Cargo.lock b/Cargo.lock index 5a1c41f..fe059ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + [[package]] name = "anyhow" version = "1.0.75" @@ -229,6 +235,15 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -557,6 +572,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "escargot" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "768064bd3a0e2bedcba91dc87ace90beea91acc41b6a01a3ca8e9aa8827461bf" +dependencies = [ + "log", + "once_cell", + "serde", + "serde_json", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -575,6 +602,33 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.40", +] + +[[package]] +name = "foreign-types-shared" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" + [[package]] name = "futures" version = "0.3.29" @@ -1248,6 +1302,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" + [[package]] name = "portpicker" version = "0.1.1" @@ -1371,6 +1431,9 @@ dependencies = [ "protofish", "rcgen", "rmp-serde", + "rstack", + "rstack-launcher", + "rstack-self", "rustls", "serde", "serde_json", @@ -1526,6 +1589,42 @@ dependencies = [ "serde", ] +[[package]] +name = "rstack" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7df9d3ebd4f17b52e6134efe2fa20021c80688cbe823d481a729a993b730493" +dependencies = [ + "cfg-if", + "libc", + "log", + "unwind", +] + +[[package]] +name = "rstack-launcher" +version = "0.1.0" +dependencies = [ + "escargot", + "os-id", + "rstack-self", +] + +[[package]] +name = "rstack-self" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd5030da3aba0ec731502f74ec38e63798eea6bc8b8ba5972129afe3eababd2" +dependencies = [ + "antidote", + "backtrace", + "bincode", + "lazy_static", + "libc", + "rstack", + "serde", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2108,6 +2207,27 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unwind" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38290439f8459ba56c4bf15fc776463f495fefc4f0112f87a1a075540441b083" +dependencies = [ + "foreign-types", + "libc", + "unwind-sys", +] + +[[package]] +name = "unwind-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a81ba64bc45243d442e9bb2a362f303df152b5078c56ce4a0dc7d813c8df91" +dependencies = [ + "libc", + "pkg-config", +] + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index df08256..2969c79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,11 +41,18 @@ wildmatch = "1" glob = "0.3" shell-words = "1" +[target.'cfg(unix)'.dependencies] +rstack = "0.3.3" + [dev-dependencies] portpicker = "0.1.1" grpc-tester = { version = "0.1.0", path = "test/rust_grpc"} serial_test = "2.0.0" lazy_static = "1.4.0" +[target.'cfg(unix)'.dev-dependencies] +rstack-self = "0.3.0" +rstack-launcher = { version = "0.1.0", path = "test/rstack-launcher" } + [profile.release] debug = true \ No newline at end of file diff --git a/src/connection/http2.rs b/src/connection/http2.rs index c5a1c5f..cf18658 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -15,6 +15,7 @@ use std::sync::mpsc::Sender; use std::task::{Context, Poll}; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::sync::{Semaphore, SemaphorePermit, TryAcquireError}; use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; @@ -91,6 +92,7 @@ where // The client_connection will produce individual HTTP request that we'll accept. // These requests will be handled in parallel by spawning them into their own // tasks. + let processing_control = ProcessingControl::new(); while let Some(request) = client_connection.accept().await { let (client_request, client_response) = request.context(H2Error {}).context(ClientError { @@ -104,6 +106,7 @@ where client_request, client_response, server_stream, + processing_control.clone(), &ui, )?; @@ -148,6 +151,12 @@ pub struct ProxyRequest request_processor: ProcessingFuture, } +/// Manages the asynchronous auxiliary processing of requests. +struct ProcessingControl +{ + callstack_capture_limiter: Semaphore, +} + struct ProcessingFuture { inner: JoinHandle<()>, @@ -155,12 +164,13 @@ struct ProcessingFuture impl ProxyRequest { - pub fn new( + fn new( connection_uuid: Uuid, authority: Option, client_request: Request, client_response: SendResponse, server_stream: &mut client::SendRequest, + processing_control: Arc, ui: &Sender, ) -> Result { @@ -203,7 +213,7 @@ impl ProxyRequest // Request processor supports asynchronous message processing while the proxide is busy proxying data between // the client and the server. - let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui); + let request_processor = ProcessingFuture::spawn(uuid, &client_head, processing_control, ui); let server_request = Request::from_parts(client_head, ()); @@ -464,7 +474,12 @@ fn is_fatal_error(r: &Result) -> bool impl ProcessingFuture { - fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender) -> Self + fn spawn( + uuid: Uuid, + client_head: &http::request::Parts, + processing_control: Arc, + ui: &Sender, + ) -> Self { let mut tasks: JoinSet>> = JoinSet::new(); @@ -473,7 +488,10 @@ impl ProcessingFuture if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) { let ui_clone = ui.clone(); tasks.spawn(ProcessingFuture::capture_client_callstack( - uuid, thread_id, ui_clone, + uuid, + thread_id, + processing_control.clone(), + ui_clone, )); } @@ -495,18 +513,35 @@ impl ProcessingFuture async fn capture_client_callstack( uuid: Uuid, - _client_thread_id: ClientThreadId, + client_thread_id: ClientThreadId, + processing_control: Arc, ui: Sender, ) -> std::result::Result<(), Box> { - // TODO: Try to capture the callstack - ui.send(SessionEvent::ClientCallstackProcessed( - ClientCallstackProcessedEvent { - uuid, - callstack: ClientCallstack::Unsupported, - }, - ))?; - Ok(()) + // Capturing the callstacks is a very expensive operation + // Capturing is throttled with the semaphore in processing_control + match processing_control.try_request_capture_callstack_permit() { + Ok(_) => { + // TODO Callstack capture: Add support for other operating systems. + #[cfg(target_os = "linux")] + capture_client_callstack_rstack(uuid, client_thread_id, ui).await?; + + #[cfg(not(target_os = "linux"))] + capture_client_callstack_unsupported(uuid, client_thread_id, ui).await?; + + Ok(()) + } + Err(TryAcquireError::NoPermits) => { + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Throttled, + }, + ))?; + Ok(()) + } + Err(e) => Err(Box::from(e)), + } } } @@ -522,3 +557,69 @@ impl Future for ProcessingFuture } } } + +impl ProcessingControl +{ + fn new() -> Arc + { + let parallel_callstack_capture_limit = if cfg!(not(test)) { 5 } else { 1 }; + Arc::new(Self { + callstack_capture_limiter: Semaphore::new(parallel_callstack_capture_limit), + }) + } + + /// Requests permissions to capture a cleint callstack. + fn try_request_capture_callstack_permit(&self) -> Result, TryAcquireError> + { + self.callstack_capture_limiter.try_acquire() + } +} + +#[cfg(target_os = "linux")] +async fn capture_client_callstack_rstack( + uuid: Uuid, + client_thread_id: ClientThreadId, + ui: Sender, +) -> std::result::Result<(), Box> +{ + if client_thread_id.process_id != std::process::id() { + capture_client_callstack_unsupported(uuid, client_thread_id, ui).await + } else { + // The caller requested trace from the process itself. + // This should only happen in unit tests. + // Process cannot capture callstack from itself which is why the operation is delegated to rstack_launcher + // helper library available in tests. + + #[cfg(test)] + { + let thread = rstack_launcher::capture_self(client_thread_id.thread_id)?; + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Callstack(callstack::Thread::from(&thread)), + }, + ))?; + } + + #[cfg(not(test))] + { + capture_client_callstack_unsupported(uuid, client_thread_id, ui).await?; + } + Ok(()) + } +} + +async fn capture_client_callstack_unsupported( + uuid: Uuid, + _client_thread_id: ClientThreadId, + ui: Sender, +) -> std::result::Result<(), Box> +{ + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Unsupported, + }, + ))?; + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index c373346..b9bc9cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use crossterm::{ }; use log::error; use snafu::{ResultExt, Snafu}; +use std::env; use std::fs::File; use std::io::stdout; use std::io::Read; @@ -89,6 +90,11 @@ pub struct ProxyFilter fn main() { + // Launched to capture stack? + if env::args_os().len() == 2 && env::args_os().any(|p| p == "child") { + return; + } + match proxide_main() { Ok(_) => (), Err(e) => { @@ -430,6 +436,7 @@ mod test use tokio::time::Instant; use crate::session::events::SessionEvent; + use crate::session::ClientCallstack; use crate::ConnectionOptions; lazy_static! { @@ -570,15 +577,22 @@ mod test // UI channel should be constantly receiving client callstack events. // The generator includes the process id and the thread id in the messages it sends. - let mut client_callstack_received = false; + let mut client_callstack_received: Option = None; let timeout_at = Instant::now().add(Duration::from_secs(30)); while let Some(message) = tokio::select! { result = message_rx.recv() => result, _t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ), error = error_monitor.recv() => panic!( "{:?}", error ), } { - if let SessionEvent::ClientCallstackProcessed(..) = message { - client_callstack_received = true; + // Try to collect a valid callstack. + // The capture process has a throttling mechanism which may skip some captures. + if let SessionEvent::ClientCallstackProcessed(event) = message { + match event.callstack { + ClientCallstack::Callstack(thread) => client_callstack_received = Some(thread), + ClientCallstack::Throttled => {} + ClientCallstack::Unsupported => break, + ClientCallstack::Error(error) => panic!("{:?}", error), + } break; } else if Instant::now() > timeout_at { panic!("Timeout") @@ -586,7 +600,18 @@ mod test } // Ensure the ui channel was not closed prematurely. - assert!(client_callstack_received); + #[cfg(target_os = "linux")] + { + let client_callstack_received = + client_callstack_received.expect("Client callstack unavailable."); + assert_eq!(client_callstack_received.name(), "grpc-generator"); + } + + // Verify callstack 1with tne new supported OS as well. + #[cfg(not(target_os = "linux"))] + { + assert!(client_callstack_received.is_none()); + } let mut server = tester.stop_generator().expect("Stopping generator failed."); abort_tx.send(()).expect("Stopping proxide failed."); diff --git a/src/session.rs b/src/session.rs index 971a2ed..09da084 100644 --- a/src/session.rs +++ b/src/session.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use uuid::Uuid; +pub mod callstack; pub mod events; pub mod serialization; @@ -132,6 +133,22 @@ pub enum ClientCallstack { /// Proxide does not support callstack capture on the current platform/operating system. Unsupported, + + /// The maximum number of parallel callstack captures was reached. + Throttled, + + /// Captured client thread with its callstack. + Callstack(crate::session::callstack::Thread), + + /// An error occurred during the capture process. + Error(ClientCallstackError), +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub enum ClientCallstackError +{ + /// An internal error to proxide occurred while capturing or processing the callstack. + Internal(String), } impl IndexedVec diff --git a/src/session/callstack.rs b/src/session/callstack.rs new file mode 100644 index 0000000..f8cf044 --- /dev/null +++ b/src/session/callstack.rs @@ -0,0 +1,131 @@ +use serde::{Deserialize, Serialize}; + +/// UI visualization types for callstack captures. + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Thread +{ + /// Identity of the thread. + id: i64, + + /// Name of the thread. + name: String, + + /// Captured stack frames of the thread. + frames: Vec, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Frame +{ + symbols: Vec, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +pub struct Symbol +{ + name: String, +} + +impl Thread +{ + pub fn id(&self) -> i64 + { + self.id + } + + pub fn name(&self) -> &str + { + &self.name + } + + pub fn frames(&self) -> &[Frame] + { + &self.frames + } +} + +impl Frame +{ + pub fn symbols(&self) -> &[Symbol] + { + &self.symbols + } +} + +impl Symbol +{ + pub fn name(&self) -> &str + { + &self.name + } +} + +#[cfg(target_os = "linux")] +impl From<&rstack::Thread> for Thread +{ + fn from(value: &rstack::Thread) -> Self + { + Self { + id: value.id() as i64, + name: value.name().unwrap_or("").to_string(), + frames: value.frames().iter().map(Frame::from).collect(), + } + } +} + +#[cfg(target_os = "linux")] +impl From<&rstack::Frame> for Frame +{ + fn from(value: &rstack::Frame) -> Self + { + Frame { + symbols: value.symbol().iter().map(|s| Symbol::from(*s)).collect(), + } + } +} + +#[cfg(target_os = "linux")] +impl From<&rstack::Symbol> for Symbol +{ + fn from(value: &rstack::Symbol) -> Self + { + Self { + name: value.name().to_string(), + } + } +} + +#[cfg(all(target_os = "linux", test))] +impl From<&rstack_self::Thread> for Thread +{ + fn from(value: &rstack_self::Thread) -> Self + { + Self { + id: value.id() as i64, + name: value.name().to_string(), + frames: value.frames().iter().map(Frame::from).collect(), + } + } +} + +#[cfg(all(target_os = "linux", test))] +impl From<&rstack_self::Frame> for Frame +{ + fn from(value: &rstack_self::Frame) -> Self + { + Self { + symbols: value.symbols().iter().map(Symbol::from).collect(), + } + } +} +#[cfg(all(target_os = "linux", test))] +impl From<&rstack_self::Symbol> for Symbol +{ + fn from(value: &rstack_self::Symbol) -> Self + { + Symbol { + name: value.name().expect("Name missing").to_string(), + } + } +} diff --git a/src/ui/toast.rs b/src/ui/toast.rs index a730170..d545230 100644 --- a/src/ui/toast.rs +++ b/src/ui/toast.rs @@ -51,7 +51,9 @@ impl PartialEq for FutureEvent { fn eq(&self, other: &Self) -> bool { - self.instant.eq(&other.instant) + // clippy reports an "unconditional recursion"false positive here in the pipeline with: + // "self.instant.eq(&other.instant)" + PartialEq::::eq(&self.instant, &other.instant) } } diff --git a/src/ui/views/callstack_view.rs b/src/ui/views/callstack_view.rs index ed843a9..cfff488 100644 --- a/src/ui/views/callstack_view.rs +++ b/src/ui/views/callstack_view.rs @@ -33,11 +33,18 @@ impl View for CallstackView client_thread.process_id(), client_thread.thread_id() ); - let message = match request.request_data.client_callstack { + let message: String = match &request.request_data.client_callstack { Some(ClientCallstack::Unsupported) => { - "Callstack unavailable:\n* Unsupported operating system." - } - None => ".. (Pending)", + "Callstack unavailable:\n* Unsupported operating system.".to_string() + }, + Some(ClientCallstack::Throttled) => { + "Callstack unavailable:\n* The maximum number of parallel callstack capture operations was reached.".to_string() + }, + Some(ClientCallstack::Callstack( thread)) => message_from_thread( thread ), + Some(ClientCallstack::Error(error)) => { + format!("{:?}", error) + }, + None => ".. (Pending)".to_string(), }; let block = create_block(&title); let request_data = Paragraph::new(message) @@ -86,3 +93,19 @@ impl View for CallstackView ) } } + +fn message_from_thread(thread: &crate::session::callstack::Thread) -> String +{ + let title = format!("{} ({})", thread.name(), thread.id()); + let callstack = thread + .frames() + .iter() + .flat_map(|f| f.symbols()) + .map(|s| s.name()) + .fold(String::default(), |mut acc, name| { + acc.push_str(name); + acc.push('\n'); + acc + }); + format!("{}\n\n{}", title, callstack) +} diff --git a/test/rstack-child/Cargo.lock b/test/rstack-child/Cargo.lock new file mode 100644 index 0000000..6d855fd --- /dev/null +++ b/test/rstack-child/Cargo.lock @@ -0,0 +1,260 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "foreign-types-shared" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.151" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memchr" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "pkg-config" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" + +[[package]] +name = "proc-macro2" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rstack" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7df9d3ebd4f17b52e6134efe2fa20021c80688cbe823d481a729a993b730493" +dependencies = [ + "cfg-if", + "libc", + "log", + "unwind", +] + +[[package]] +name = "rstack-child" +version = "0.1.0" +dependencies = [ + "rstack-self", +] + +[[package]] +name = "rstack-self" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd5030da3aba0ec731502f74ec38e63798eea6bc8b8ba5972129afe3eababd2" +dependencies = [ + "antidote", + "backtrace", + "bincode", + "lazy_static", + "libc", + "rstack", + "serde", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "serde" +version = "1.0.194" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.194" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "2.0.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unwind" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38290439f8459ba56c4bf15fc776463f495fefc4f0112f87a1a075540441b083" +dependencies = [ + "foreign-types", + "libc", + "unwind-sys", +] + +[[package]] +name = "unwind-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a81ba64bc45243d442e9bb2a362f303df152b5078c56ce4a0dc7d813c8df91" +dependencies = [ + "libc", + "pkg-config", +] diff --git a/test/rstack-child/Cargo.toml b/test/rstack-child/Cargo.toml new file mode 100644 index 0000000..15017b6 --- /dev/null +++ b/test/rstack-child/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "rstack-child" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[target.'cfg(unix)'.dependencies] +rstack-self = "0.3.0" \ No newline at end of file diff --git a/test/rstack-child/src/main.rs b/test/rstack-child/src/main.rs new file mode 100644 index 0000000..ad5064f --- /dev/null +++ b/test/rstack-child/src/main.rs @@ -0,0 +1,13 @@ +fn main() { + #[cfg(target_os = "linux")] + { + let err = rstack_self::child(); + eprintln!("{:?}", err); + err.expect("Capturing callstack with rstack-self failed."); + } + + #[cfg(not(target_os = "linux"))] + { + panic!("Unsupported operating system."); + } +} diff --git a/test/rstack-launcher/Cargo.lock b/test/rstack-launcher/Cargo.lock new file mode 100644 index 0000000..6088009 --- /dev/null +++ b/test/rstack-launcher/Cargo.lock @@ -0,0 +1,312 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "escargot" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "768064bd3a0e2bedcba91dc87ace90beea91acc41b6a01a3ca8e9aa8827461bf" +dependencies = [ + "log", + "once_cell", + "serde", + "serde_json", +] + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "foreign-types-shared" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.151" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memchr" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "os-id" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510856ec55c552d86db0d675df95c32b87f28cfe1cdc47d3eba2342c39a0a5f6" +dependencies = [ + "libc", +] + +[[package]] +name = "pkg-config" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" + +[[package]] +name = "proc-macro2" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907a61bd0f64c2f29cd1cf1dc34d05176426a3f504a78010f08416ddb7b13708" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rstack" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7df9d3ebd4f17b52e6134efe2fa20021c80688cbe823d481a729a993b730493" +dependencies = [ + "cfg-if", + "libc", + "log", + "unwind", +] + +[[package]] +name = "rstack-launcher" +version = "0.1.0" +dependencies = [ + "escargot", + "os-id", + "rstack-self", +] + +[[package]] +name = "rstack-self" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd5030da3aba0ec731502f74ec38e63798eea6bc8b8ba5972129afe3eababd2" +dependencies = [ + "antidote", + "backtrace", + "bincode", + "lazy_static", + "libc", + "rstack", + "serde", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "ryu" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" + +[[package]] +name = "serde" +version = "1.0.194" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.194" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1726efe18f42ae774cc644f330953a5e7b3c3003d3edcecf18850fe9d4dd9afb" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unwind" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38290439f8459ba56c4bf15fc776463f495fefc4f0112f87a1a075540441b083" +dependencies = [ + "foreign-types", + "libc", + "unwind-sys", +] + +[[package]] +name = "unwind-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7a81ba64bc45243d442e9bb2a362f303df152b5078c56ce4a0dc7d813c8df91" +dependencies = [ + "libc", + "pkg-config", +] diff --git a/test/rstack-launcher/Cargo.toml b/test/rstack-launcher/Cargo.toml new file mode 100644 index 0000000..41f0b8f --- /dev/null +++ b/test/rstack-launcher/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rstack-launcher" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +os-id = "3.0.1" + +[target.'cfg(unix)'.dependencies] +rstack-self = "0.3.0" + +[build-dependencies] +escargot = "0.5.8" diff --git a/test/rstack-launcher/build.rs b/test/rstack-launcher/build.rs new file mode 100644 index 0000000..9e663f4 --- /dev/null +++ b/test/rstack-launcher/build.rs @@ -0,0 +1,36 @@ +use std::fs::File; +use std::io::Write; +use std::path::Path; + +fn main() +{ + #[cfg(target_os = "linux")] + { + let out_dir = std::env::var("OUT_DIR").expect("Output directory unavailable."); + let run = escargot::CargoBuild::new() + .current_release() + .current_target() + .manifest_path("../rstack-child/Cargo.toml") + .target_dir(&out_dir) + .run() + .expect("Compiling rstack-child failed."); + + let child_template = r#" + fn launch_child() -> rstack_self::Result { + let exe = "PATH"; + Ok(rstack_self::trace(&mut Command::new(exe))?) + } + "#; + let child_template = child_template.replace( + "PATH", + run.path().to_str().expect("Unexpected characters in path."), + ); + + let dest_path = Path::new(&out_dir).join("child.rs"); + let mut f = File::create(&dest_path).expect("Opening child.rs failed."); + f.write_all(child_template.as_bytes()) + .expect("Writing child.rs failed."); + } + + println!("cargo:rerun-if-changed=build.rs"); +} diff --git a/test/rstack-launcher/src/lib.rs b/test/rstack-launcher/src/lib.rs new file mode 100644 index 0000000..dae838c --- /dev/null +++ b/test/rstack-launcher/src/lib.rs @@ -0,0 +1,93 @@ +use std::fmt::{Display, Formatter}; +use std::process::Command; + +#[cfg(target_os = "linux")] +include!(concat!(env!("OUT_DIR"), "/child.rs")); + +/// An error in launching the child. +#[derive(Debug)] +pub enum Error +{ + /// The error originates from rstack_self. + Rstack(rstack_self::Error), + + /// The specified thread was not available. + ThreadNotFound, + + /// Unsuportted operating system. + UnsupportedOperatingSystem, +} + +/// The result type returned by methods in this crate. +pub type Result = std::result::Result; + +/// Captures the callstack of a thread of the calling process. +pub fn capture_self(thread_id: i64) -> Result +{ + #[cfg(target_os = "linux")] + { + let trace: rstack_self::Trace = launch_child()?; + match trace + .threads() + .into_iter() + .find(|&t| t.id() as i64 == thread_id) + { + Some(thread) => Ok(thread.clone()), + None => Err(Error::ThreadNotFound), + } + } + + #[cfg(not(target_os = "linux"))] + Err(Error::UnsupportedOperatingSystem) +} + +impl From for Error +{ + fn from(value: rstack_self::Error) -> Self + { + Self::Rstack(value) + } +} + +impl Display for Error +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result + { + match self { + Error::Rstack(error) => Ok(error.fmt(f)?), + Error::ThreadNotFound => write!(f, "Specified thread unavailable."), + Error::UnsupportedOperatingSystem => { + write!(f, "Capture not supported on the current operatins system.") + } + } + } +} + +impl std::error::Error for Error {} + +#[cfg(test)] +mod test +{ + use crate::capture_self; + + #[test] + fn capturing_callstack_succeeds() + { + let thread_id = get_current_native_thread_id(); + let callstack = capture_self(thread_id).expect("Capturing self failed"); + assert_eq!(callstack.id() as i64, thread_id); + assert_eq!(callstack.name(), "test::capturing"); + } + + /// Gets the current native thread id. + fn get_current_native_thread_id() -> i64 + { + #[cfg(not(target_os = "windows"))] + return os_id::thread::get_raw_id() as i64; + + #[cfg(target_os = "windows")] + unsafe { + return windows::Win32::System::Threading::GetCurrentThreadId() as i64; + } + } +} From 76c3d24c6010e8cf1a64d9a43262246e3610a06c Mon Sep 17 00:00:00 2001 From: fluxie Date: Tue, 16 Jan 2024 15:13:07 +0200 Subject: [PATCH 5/6] Fixed parallel capture limiter The limiter was not shared between connections. --- src/connection.rs | 52 ++++++++++++++++++++++++++++++++++++++--- src/connection/http2.rs | 37 +++++------------------------ src/main.rs | 11 +++++++-- 3 files changed, 64 insertions(+), 36 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 3003e62..d5a0ca0 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,6 +6,7 @@ use std::sync::mpsc::Sender; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use tokio::sync::{Semaphore, TryAcquireError}; use uuid::Uuid; use crate::session::events::*; @@ -126,6 +127,43 @@ impl Streams } } +/// Manages the asynchronous auxiliary processing of connections and associated requests. +pub struct ProcessingControl +{ + /// Limits the number of parallel callstack captures. + callstack_capture_limiter: Semaphore, +} + +impl ProcessingControl +{ + pub fn new() -> Arc + { + // rstack / libunwind does not allow multiple captures in parallel. + // The limit must be, for now, always '1'. + // let parallel_callstack_capture_limit = if cfg!(not(test)) { 5 } else { 1 }; + let parallel_callstack_capture_limit = 1; + Arc::new(Self { + callstack_capture_limiter: Semaphore::new(parallel_callstack_capture_limit), + }) + } + + /// Requests permissions to capture a client callstack. + async fn acquire_callstack_capture_permit(&self) -> Result> + { + // TODO Add option for forcing callstack capture for all requests and wait for the permit here. + match self.callstack_capture_limiter.try_acquire() { + Ok(permit) => Ok(Some(permit)), + Err(TryAcquireError::NoPermits) => Ok(None), + Err(TryAcquireError::Closed) => Err(Error::ClientError { + scenario: "Unable to capture callstack", + source: EndpointError::ProxideError { + reason: "Permit limiter has been closed.", + }, + }), + } + } +} + /// When available, identifies the thread in the calling or client process. /// The client should reports its process id with the proxide-client-process-id" header and /// the thread id with the "proxide-client-thread-id" header. @@ -190,6 +228,7 @@ pub async fn run( client: TcpStream, src_addr: SocketAddr, options: Arc, + processing_control: Arc, ui: Sender, ) -> Result<()> { @@ -198,7 +237,7 @@ pub async fn run( protocol_stack: vec![], opaque_redirect: None, }; - connect_phase(details, client, src_addr, options, ui).await + connect_phase(details, client, src_addr, options, processing_control, ui).await } /// Establishes the connection to the server. @@ -210,6 +249,7 @@ pub async fn connect_phase( client: TcpStream, src_addr: SocketAddr, options: Arc, + processing_control: Arc, ui: Sender, ) -> Result<()> { @@ -265,6 +305,7 @@ pub async fn connect_phase( src_addr, connect_data.target_server, options, + processing_control, ui, ) .await @@ -309,12 +350,16 @@ pub async fn connect_phase( src_addr, target_server.to_string(), options, + processing_control, ui, ) .await } } +/// Delegates the connections to appropriate handler based on the protocol. +/// TODO Fix clippy warning. Parameters are all created or consumed at different locations => difficult to group. +#[allow(clippy::too_many_arguments)] pub async fn handle_protocol( mut details: ConnectionDetails, protocol: demux::Protocol, @@ -322,6 +367,7 @@ pub async fn handle_protocol( src_addr: SocketAddr, target: String, options: Arc, + processing_control: Arc, ui: Sender, ) -> Result<()> where @@ -331,9 +377,9 @@ where let ui_clone = ui.clone(); if protocol == demux::Protocol::Tls { let streams = tls::handle(&mut details, streams, options.clone(), target).await?; - http2::handle(details, src_addr, streams, ui_clone).await?; + http2::handle(details, src_addr, streams, processing_control, ui_clone).await?; } else { - http2::handle(details, src_addr, streams, ui_clone).await?; + http2::handle(details, src_addr, streams, processing_control, ui_clone).await?; } Ok(()) diff --git a/src/connection/http2.rs b/src/connection/http2.rs index cf18658..bfb928c 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -15,7 +15,6 @@ use std::sync::mpsc::Sender; use std::task::{Context, Poll}; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::sync::{Semaphore, SemaphorePermit, TryAcquireError}; use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; @@ -25,6 +24,7 @@ pub async fn handle( mut details: ConnectionDetails, client_addr: SocketAddr, streams: Streams, + processing_control: Arc, ui: Sender, ) -> Result<()> where @@ -92,7 +92,6 @@ where // The client_connection will produce individual HTTP request that we'll accept. // These requests will be handled in parallel by spawning them into their own // tasks. - let processing_control = ProcessingControl::new(); while let Some(request) = client_connection.accept().await { let (client_request, client_response) = request.context(H2Error {}).context(ClientError { @@ -151,12 +150,6 @@ pub struct ProxyRequest request_processor: ProcessingFuture, } -/// Manages the asynchronous auxiliary processing of requests. -struct ProcessingControl -{ - callstack_capture_limiter: Semaphore, -} - struct ProcessingFuture { inner: JoinHandle<()>, @@ -520,8 +513,8 @@ impl ProcessingFuture { // Capturing the callstacks is a very expensive operation // Capturing is throttled with the semaphore in processing_control - match processing_control.try_request_capture_callstack_permit() { - Ok(_) => { + match processing_control.acquire_callstack_capture_permit().await? { + Some(_) => { // TODO Callstack capture: Add support for other operating systems. #[cfg(target_os = "linux")] capture_client_callstack_rstack(uuid, client_thread_id, ui).await?; @@ -530,8 +523,8 @@ impl ProcessingFuture capture_client_callstack_unsupported(uuid, client_thread_id, ui).await?; Ok(()) - } - Err(TryAcquireError::NoPermits) => { + }, + None => { ui.send(SessionEvent::ClientCallstackProcessed( ClientCallstackProcessedEvent { uuid, @@ -539,8 +532,7 @@ impl ProcessingFuture }, ))?; Ok(()) - } - Err(e) => Err(Box::from(e)), + }, } } } @@ -558,23 +550,6 @@ impl Future for ProcessingFuture } } -impl ProcessingControl -{ - fn new() -> Arc - { - let parallel_callstack_capture_limit = if cfg!(not(test)) { 5 } else { 1 }; - Arc::new(Self { - callstack_capture_limiter: Semaphore::new(parallel_callstack_capture_limit), - }) - } - - /// Requests permissions to capture a cleint callstack. - fn try_request_capture_callstack_permit(&self) -> Result, TryAcquireError> - { - self.callstack_capture_limiter.try_acquire() - } -} - #[cfg(target_os = "linux")] async fn capture_client_callstack_rstack( uuid: Uuid, diff --git a/src/main.rs b/src/main.rs index b9bc9cc..fb5423b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -391,10 +391,16 @@ fn spawn_accept( ui_tx: Sender, ) { + let processing_control = connection::ProcessingControl::new(); tokio::spawn(async move { loop { let ui_tx = ui_tx.clone(); - new_connection(ui_tx, listener.accept().await, options.clone()); + new_connection( + ui_tx, + listener.accept().await, + options.clone(), + processing_control.clone(), + ); } }); } @@ -403,13 +409,14 @@ fn new_connection( tx: Sender, result: Result<(TcpStream, SocketAddr), std::io::Error>, options: Arc, + processing_control: Arc, ) { // Process the new connection by spawning a new tokio task. This allows the original task to // process more connections. if let Ok((socket, src_addr)) = result { tokio::spawn(async move { - match run(socket, src_addr, options, tx).await { + match run(socket, src_addr, options, processing_control, tx).await { Ok(..) => {} Err(e) => error!("Connection error\n{}", e), } From bb6d81f36045a96e9e88df69953d51cf4dd46651 Mon Sep 17 00:00:00 2001 From: fluxie Date: Tue, 16 Jan 2024 15:46:27 +0200 Subject: [PATCH 6/6] Capture client's callstacks with rstack The capture is done if the client provided the process id and the thread id of the client's thread for the capture. A parametrization will be added in the future for better usability. --- src/connection/http2.rs | 64 ++++++++++++++++++++++++++++++++++++++--- src/session.rs | 3 ++ 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/src/connection/http2.rs b/src/connection/http2.rs index bfb928c..14f331f 100644 --- a/src/connection/http2.rs +++ b/src/connection/http2.rs @@ -510,10 +510,43 @@ impl ProcessingFuture processing_control: Arc, ui: Sender, ) -> std::result::Result<(), Box> + { + match ProcessingFuture::try_capture_client_callstack( + uuid, + client_thread_id, + processing_control, + ui.clone(), + ) + .await + { + Ok(_) => {} + Err(error) => { + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { + uuid, + callstack: ClientCallstack::Error(ClientCallstackError::Internal( + error.to_string(), + )), + }, + ))?; + } + } + Ok(()) + } + + async fn try_capture_client_callstack( + uuid: Uuid, + client_thread_id: ClientThreadId, + processing_control: Arc, + ui: Sender, + ) -> std::result::Result<(), Box> { // Capturing the callstacks is a very expensive operation // Capturing is throttled with the semaphore in processing_control - match processing_control.acquire_callstack_capture_permit().await? { + match processing_control + .acquire_callstack_capture_permit() + .await? + { Some(_) => { // TODO Callstack capture: Add support for other operating systems. #[cfg(target_os = "linux")] @@ -523,7 +556,7 @@ impl ProcessingFuture capture_client_callstack_unsupported(uuid, client_thread_id, ui).await?; Ok(()) - }, + } None => { ui.send(SessionEvent::ClientCallstackProcessed( ClientCallstackProcessedEvent { @@ -532,7 +565,7 @@ impl ProcessingFuture }, ))?; Ok(()) - }, + } } } } @@ -558,7 +591,29 @@ async fn capture_client_callstack_rstack( ) -> std::result::Result<(), Box> { if client_thread_id.process_id != std::process::id() { - capture_client_callstack_unsupported(uuid, client_thread_id, ui).await + let process = match rstack::TraceOptions::new() + .thread_names(true) + .symbols(true) + .trace(client_thread_id.process_id()) + { + Ok(process) => process, + Err(error) => return Err(Box::new(error)), + }; + + let callstack = match process + .threads() + .iter() + .find(|t| t.id() as i64 == client_thread_id.thread_id) + { + Some(thread) => ClientCallstack::Callstack(callstack::Thread::from(thread)), + None => ClientCallstack::Error(ClientCallstackError::ThreadUnavailable( + client_thread_id.thread_id(), + )), + }; + ui.send(SessionEvent::ClientCallstackProcessed( + ClientCallstackProcessedEvent { uuid, callstack }, + ))?; + Ok(()) } else { // The caller requested trace from the process itself. // This should only happen in unit tests. @@ -584,6 +639,7 @@ async fn capture_client_callstack_rstack( } } +#[allow(dead_code)] async fn capture_client_callstack_unsupported( uuid: Uuid, _client_thread_id: ClientThreadId, diff --git a/src/session.rs b/src/session.rs index 09da084..b69f2f2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -149,6 +149,9 @@ pub enum ClientCallstackError { /// An internal error to proxide occurred while capturing or processing the callstack. Internal(String), + + /// The specified thread was not available in the client process. + ThreadUnavailable(i64), } impl IndexedVec