From 5b7fdd54f4b729e9510daf1e07ad8dc1bf12a2bb Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 10:25:57 -0500 Subject: [PATCH 01/12] stash a tower service example --- crates/mcp-client/examples/svc.rs | 54 +++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 crates/mcp-client/examples/svc.rs diff --git a/crates/mcp-client/examples/svc.rs b/crates/mcp-client/examples/svc.rs new file mode 100644 index 000000000..cf0952e14 --- /dev/null +++ b/crates/mcp-client/examples/svc.rs @@ -0,0 +1,54 @@ +use tokio::time::{sleep, Duration}; +use tower::{Service, ServiceBuilder}; +use std::task::{Context, Poll}; +use futures::future::{BoxFuture, FutureExt}; + +// Define a simple service that takes some time to respond +struct SlowService; + +impl Service for SlowService { + type Response = String; + type Error = &'static str; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: String) -> Self::Future { + println!("Processing request: {}", req); + + // Use an async block to create the future + async move { + // Simulate a slow response + sleep(Duration::from_secs(3)).await; + Ok(format!("Processed: {}", req)) + } + .boxed() // Convert the future into a BoxFuture + } +} + +#[tokio::main] +async fn main() { + // Create the base service + let service = SlowService; + + // Wrap the service with a timeout layer + let timeout_service = ServiceBuilder::new() + .timeout(Duration::from_secs(1)) + .service(service); + + // Use the timeout-wrapped service + let mut svc = timeout_service; + + match svc.call("Hello Tower!".to_string()).await { + Ok(response) => println!("Response: {}", response), + Err(err) => { + if let Some(_elapsed) = err.downcast_ref::() { + println!("Error: Timed out"); + } else { + println!("Error: {:?}", err); + } + } + } +} From 75dd23199d1e96b7a998beb4ae4366a941e9715b Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 10:37:21 -0500 Subject: [PATCH 02/12] update goosehints and cursorrules --- .cursorrules | 276 +++++++++++++++++++++++++++++++++++++++++++++++++-- .goosehints | 275 ++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 539 insertions(+), 12 deletions(-) diff --git a/.cursorrules b/.cursorrules index 4eaa69991..22b7abb48 100644 --- a/.cursorrules +++ b/.cursorrules @@ -1,8 +1,86 @@ -You are an expert programmer in Rust teaching who is teaching another developer who is learning Rust. -The students are familiar with programming in languages such as Python (advanced), Java (novice) and C (novice) so -when possible use analogies from those languages. +Anthropic just released Model Context Protocol (MCP), a new standard for connecting AI assistants to the systems where data lives, including content repositories, business tools, and development environments. The Model Context Protocol is an open standard that enables developers to build secure, two-way connections between their data sources and AI-powered tools. The architecture is straightforward: developers can either expose their data through MCP servers or build AI applications (MCP clients) that connect to these servers. + +Model Context Protocol (MCP), is heavily inspired by Microsoft's Language Server Protocol (LSP) used in IDEs. + +# MCP Protocol + +## Lifecycle + +The Model Context Protocol (MCP) defines a rigorous lifecycle for client-server connections that ensures proper capability negotiation and state management. + +3 phases: +1. Initialization: Capability negotiation and protocol version agreement +2. Operation: Normal protocol communication +3. Shutdown: Graceful termination of the connection + + +### Initialization +The initialization phase MUST be the first interaction between client and server. During this phase, the client and server: +- Establish protocol version compatibility +- Exchange and negotiate capabilities +- Share implementation details + +The client MUST initiate this phase by sending an initialize request containing: +- Protocol version supported +- Client capabilities +- Client implementation information + +### Operation +During the operation phase, the client and server exchange messages according to the negotiated capabilities. + +Both parties SHOULD: +- Respect the negotiated protocol version +- Only use capabilities that were successfully negotiated + +### Shutdown +During the shutdown phase, one side (usually the client) cleanly terminates the protocol connection. No specific shutdown messages are defined—instead, the underlying transport mechanism should be used to signal connection termination: + +#### stdio +For the stdio transport, the client SHOULD initiate shutdown by: + +1. First, closing the input stream to the child process (the server) +2. Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time +3. Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM + +The server MAY initiate shutdown by closing its output stream to the client and exiting. + +#### HTTP with SSE + +For HTTP transports, shutdown is indicated by closing the associated HTTP connection(s). + + +## Transports + +2 standard transport mechanisms for client-server communication: + +1. stdio, communication over standard in and standard out +2. HTTP with Server-Sent Events (SSE) + +### stdio + +- The client launches the MCP server as a subprocess. +- The server receives JSON-RPC messages on its standard input (stdin) and writes responses to its standard output (stdout). +- Messages are delimited by newlines, and MUST NOT contain embedded newlines. +- The server MAY write UTF-8 strings to its standard error (stderr) for logging purposes. Clients MAY capture, forward, or ignore this logging. +- The server MUST NOT write anything to its stdout that is not a valid MCP message. +- The client MUST NOT write anything to the server’s stdin that is not a valid MCP message. + +### HTTP with SSE + +In the SSE transport, the server operates as an independent process that can handle multiple client connections. + +The server MUST provide two endpoints: +1. An SSE endpoint, for clients to establish a connection and receive messages from the server +2. A regular HTTP POST endpoint for clients to send messages to the server + +When a client connects, the server MUST send an endpoint event containing a URI for the client to use for sending messages. All subsequent client messages MUST be sent as HTTP POST requests to this endpoint. + +Server messages are sent as SSE message events, with the message content encoded as JSON in the event data. + +------------------ + +# General Guidelines for writing Rust code -Key Principles - Write clear, concise, and idiomatic Rust code with accurate examples. - Use async programming paradigms effectively, leveraging `tokio` for concurrency. - Prioritize modularity, clean code organization, and efficient resource management. @@ -11,6 +89,21 @@ Key Principles - Avoid code duplication; use functions and modules to encapsulate reusable logic. - Write code with safety, concurrency, and performance in mind, embracing Rust's ownership and type system. +Async Programming +- Use `tokio` as the async runtime for handling asynchronous tasks and I/O. +- Implement async functions using `async fn` syntax. +- Leverage `tokio::spawn` for task spawning and concurrency. +- Use `tokio::select!` for managing multiple async tasks and cancellations. +- Favor structured concurrency: prefer scoped tasks and clean cancellation paths. +- Implement timeouts, retries, and backoff strategies for robust async operations. + +Channels and Concurrency +- Use Rust's `tokio::sync::mpsc` for asynchronous, multi-producer, single-consumer channels. +- Use `tokio::sync::broadcast` for broadcasting messages to multiple consumers. +- Implement `tokio::sync::oneshot` for one-time communication between tasks. +- Prefer bounded channels for backpressure; handle capacity limits gracefully. +- Use `tokio::sync::Mutex` and `tokio::sync::RwLock` for shared state across tasks, avoiding deadlocks. + Error Handling and Safety - Embrace Rust's Result and Option types for error handling. - Use `?` operator to propagate errors in async functions. @@ -18,10 +111,181 @@ Error Handling and Safety - Handle errors and edge cases early, returning errors where appropriate. - Use `.await` responsibly, ensuring safe points for context switching. +Testing +- Write unit tests with `tokio::test` for async tests. +- Use `tokio::time::pause` for testing time-dependent code without real delays. +- Implement integration tests to validate async behavior and concurrency. +- Use mocks and fakes for external dependencies in tests. + +Performance Optimization +- Minimize async overhead; use sync code where async is not needed. +- Avoid blocking operations inside async functions; offload to dedicated blocking threads if necessary. +- Use `tokio::task::yield_now` to yield control in cooperative multitasking scenarios. +- Optimize data structures and algorithms for async use, reducing contention and lock duration. +- Use `tokio::time::sleep` and `tokio::time::interval` for efficient time-based operations. + Key Conventions 1. Structure the application into modules: separate concerns like networking, database, and business logic. 2. Use environment variables for configuration management (e.g., `dotenv` crate). 3. Ensure code is well-documented with inline comments and Rustdoc. -4. Do not use the older style of "MOD/mod.rs" for separing modules and instead use the "MOD.rs" filename convention. -Refer to "The Rust Programming Language" book (2024 version) and "Command line apps in Rust" documentation for in-depth information on best practices, and advanced features. +Async Ecosystem +- Use `tokio` for async runtime and task management. +- Leverage `hyper` or `reqwest` for async HTTP requests. +- Use `serde` for serialization/deserialization. + + +## Tower crate + +- Tower is a library of modular and reusable components for building networking clients and servers. +- Tower provides a simple core abstraction, the Service trait, which represents an asynchronous function taking a request and returning either a response or an error. This abstraction can be used to model both clients and servers. +```rust +pub trait Service { + type Response; + type Error; + type Future: Future>; + + // Required methods + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>; + fn call(&mut self, req: Request) -> Self::Future; +} +``` +- Generic components, like timeout, rate limiting, and load balancing, can be modeled as Services that wrap some inner service and apply additional behavior before or after the inner service is called. This allows implementing these components in a protocol-agnostic, composable way. Typically, such services are referred to as middleware. +- An additional abstraction, the Layer trait, is used to compose middleware with Services. If a Service can be thought of as an asynchronous function from a request type to a response type, a Layer is a function taking a Service of one type and returning a Service of a different type. The ServiceBuilder type is used to add middleware to a service by composing it with multiple Layers. + +Here is an example of using tower service with timeout: +``` +use tokio::time::{sleep, Duration}; +use tower::{Service, ServiceBuilder}; +use std::task::{Context, Poll}; +use futures::future::{BoxFuture, FutureExt}; + +// Define a simple service that takes some time to respond +struct SlowService; + +impl Service for SlowService { + type Response = String; + type Error = &'static str; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: String) -> Self::Future { + println!("Processing request: {}", req); + + // Use an async block to create the future + async move { + // Simulate a slow response + sleep(Duration::from_secs(3)).await; + Ok(format!("Processed: {}", req)) + } + .boxed() // Convert the future into a BoxFuture + } +} + +#[tokio::main] +async fn main() { + // Create the base service + let service = SlowService; + + // Wrap the service with a timeout layer + let timeout_service = ServiceBuilder::new() + .timeout(Duration::from_secs(1)) + .service(service); + + // Use the timeout-wrapped service + let mut svc = timeout_service; + + match svc.call("Hello Tower!".to_string()).await { + Ok(response) => println!("Response: {}", response), + Err(err) => { + if let Some(_elapsed) = err.downcast_ref::() { + println!("Error: Timed out"); + } else { + println!("Error: {:?}", err); + } + } + } +} +``` + +## Actors in Tokio + +- An actor is a self-contained task (usually spawned by an async runtime like Tokio) that runs independently and communicates with other parts of the system by sending and receiving messages. + - Each actor encapsulates some resource or job. + - Other tasks/actors access it by sending messages rather than by directly sharing data. + +- An actor is split into two parts: + 1. The Task – A run function or method that processes incoming messages in a loop, shutting down gracefully when no more messages can be received. + 2. The Handle – A struct that owns a mpsc::Sender (or other channel) and exposes async methods to send messages to the actor. + +A minimal actor example might look like this: +``` +use tokio::sync::{oneshot, mpsc}; + +// Message type(s) the actor can receive. +enum ActorMessage { + GetUniqueId { + respond_to: oneshot::Sender, + }, +} + +// The actor’s internal state and logic. +struct MyActor { + receiver: mpsc::Receiver, + next_id: u32, +} + +impl MyActor { + fn new(receiver: mpsc::Receiver) -> Self { + Self { receiver, next_id: 0 } + } + + fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::GetUniqueId { respond_to } => { + self.next_id += 1; + let _ = respond_to.send(self.next_id); // Ignore send errors + } + } + } +} + +// The actor’s main event loop. +async fn run_my_actor(mut actor: MyActor) { + while let Some(msg) = actor.receiver.recv().await { + actor.handle_message(msg); + } +} + +// The "handle": provides methods to send messages to the actor. +#[derive(Clone)] +struct MyActorHandle { + sender: mpsc::Sender, +} + +impl MyActorHandle { + // Creates both the sender and the spawned actor task. + pub fn new() -> Self { + let (sender, receiver) = mpsc::channel(8); + let actor = MyActor::new(receiver); + tokio::spawn(run_my_actor(actor)); + + Self { sender } + } + + // Example request-response method. + pub async fn get_unique_id(&self) -> u32 { + let (send, recv) = oneshot::channel(); + let msg = ActorMessage::GetUniqueId { respond_to: send }; + let _ = self.sender.send(msg).await; // Sends the message to the actor + recv.await.expect("Actor task was killed") + } +} +``` + diff --git a/.goosehints b/.goosehints index 4eaa69991..1a6e48eaa 100644 --- a/.goosehints +++ b/.goosehints @@ -1,8 +1,86 @@ -You are an expert programmer in Rust teaching who is teaching another developer who is learning Rust. -The students are familiar with programming in languages such as Python (advanced), Java (novice) and C (novice) so -when possible use analogies from those languages. +Anthropic just released Model Context Protocol (MCP), a new standard for connecting AI assistants to the systems where data lives, including content repositories, business tools, and development environments. The Model Context Protocol is an open standard that enables developers to build secure, two-way connections between their data sources and AI-powered tools. The architecture is straightforward: developers can either expose their data through MCP servers or build AI applications (MCP clients) that connect to these servers. + +Model Context Protocol (MCP), is heavily inspired by Microsoft's Language Server Protocol (LSP) used in IDEs. + +# MCP Protocol + +## Lifecycle + +The Model Context Protocol (MCP) defines a rigorous lifecycle for client-server connections that ensures proper capability negotiation and state management. + +3 phases: +1. Initialization: Capability negotiation and protocol version agreement +2. Operation: Normal protocol communication +3. Shutdown: Graceful termination of the connection + + +### Initialization +The initialization phase MUST be the first interaction between client and server. During this phase, the client and server: +- Establish protocol version compatibility +- Exchange and negotiate capabilities +- Share implementation details + +The client MUST initiate this phase by sending an initialize request containing: +- Protocol version supported +- Client capabilities +- Client implementation information + +### Operation +During the operation phase, the client and server exchange messages according to the negotiated capabilities. + +Both parties SHOULD: +- Respect the negotiated protocol version +- Only use capabilities that were successfully negotiated + +### Shutdown +During the shutdown phase, one side (usually the client) cleanly terminates the protocol connection. No specific shutdown messages are defined—instead, the underlying transport mechanism should be used to signal connection termination: + +#### stdio +For the stdio transport, the client SHOULD initiate shutdown by: + +1. First, closing the input stream to the child process (the server) +2. Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time +3. Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM + +The server MAY initiate shutdown by closing its output stream to the client and exiting. + +#### HTTP with SSE + +For HTTP transports, shutdown is indicated by closing the associated HTTP connection(s). + + +## Transports + +2 standard transport mechanisms for client-server communication: + +1. stdio, communication over standard in and standard out +2. HTTP with Server-Sent Events (SSE) + +### stdio + +- The client launches the MCP server as a subprocess. +- The server receives JSON-RPC messages on its standard input (stdin) and writes responses to its standard output (stdout). +- Messages are delimited by newlines, and MUST NOT contain embedded newlines. +- The server MAY write UTF-8 strings to its standard error (stderr) for logging purposes. Clients MAY capture, forward, or ignore this logging. +- The server MUST NOT write anything to its stdout that is not a valid MCP message. +- The client MUST NOT write anything to the server’s stdin that is not a valid MCP message. + +### HTTP with SSE + +In the SSE transport, the server operates as an independent process that can handle multiple client connections. + +The server MUST provide two endpoints: +1. An SSE endpoint, for clients to establish a connection and receive messages from the server +2. A regular HTTP POST endpoint for clients to send messages to the server + +When a client connects, the server MUST send an endpoint event containing a URI for the client to use for sending messages. All subsequent client messages MUST be sent as HTTP POST requests to this endpoint. + +Server messages are sent as SSE message events, with the message content encoded as JSON in the event data. + +------------------ + +# General Guidelines for writing Rust code -Key Principles - Write clear, concise, and idiomatic Rust code with accurate examples. - Use async programming paradigms effectively, leveraging `tokio` for concurrency. - Prioritize modularity, clean code organization, and efficient resource management. @@ -11,6 +89,21 @@ Key Principles - Avoid code duplication; use functions and modules to encapsulate reusable logic. - Write code with safety, concurrency, and performance in mind, embracing Rust's ownership and type system. +Async Programming +- Use `tokio` as the async runtime for handling asynchronous tasks and I/O. +- Implement async functions using `async fn` syntax. +- Leverage `tokio::spawn` for task spawning and concurrency. +- Use `tokio::select!` for managing multiple async tasks and cancellations. +- Favor structured concurrency: prefer scoped tasks and clean cancellation paths. +- Implement timeouts, retries, and backoff strategies for robust async operations. + +Channels and Concurrency +- Use Rust's `tokio::sync::mpsc` for asynchronous, multi-producer, single-consumer channels. +- Use `tokio::sync::broadcast` for broadcasting messages to multiple consumers. +- Implement `tokio::sync::oneshot` for one-time communication between tasks. +- Prefer bounded channels for backpressure; handle capacity limits gracefully. +- Use `tokio::sync::Mutex` and `tokio::sync::RwLock` for shared state across tasks, avoiding deadlocks. + Error Handling and Safety - Embrace Rust's Result and Option types for error handling. - Use `?` operator to propagate errors in async functions. @@ -18,10 +111,180 @@ Error Handling and Safety - Handle errors and edge cases early, returning errors where appropriate. - Use `.await` responsibly, ensuring safe points for context switching. +Testing +- Write unit tests with `tokio::test` for async tests. +- Use `tokio::time::pause` for testing time-dependent code without real delays. +- Implement integration tests to validate async behavior and concurrency. +- Use mocks and fakes for external dependencies in tests. + +Performance Optimization +- Minimize async overhead; use sync code where async is not needed. +- Avoid blocking operations inside async functions; offload to dedicated blocking threads if necessary. +- Use `tokio::task::yield_now` to yield control in cooperative multitasking scenarios. +- Optimize data structures and algorithms for async use, reducing contention and lock duration. +- Use `tokio::time::sleep` and `tokio::time::interval` for efficient time-based operations. + Key Conventions 1. Structure the application into modules: separate concerns like networking, database, and business logic. 2. Use environment variables for configuration management (e.g., `dotenv` crate). 3. Ensure code is well-documented with inline comments and Rustdoc. -4. Do not use the older style of "MOD/mod.rs" for separing modules and instead use the "MOD.rs" filename convention. -Refer to "The Rust Programming Language" book (2024 version) and "Command line apps in Rust" documentation for in-depth information on best practices, and advanced features. +Async Ecosystem +- Use `tokio` for async runtime and task management. +- Leverage `hyper` or `reqwest` for async HTTP requests. +- Use `serde` for serialization/deserialization. + + +## Tower crate + +- Tower is a library of modular and reusable components for building networking clients and servers. +- Tower provides a simple core abstraction, the Service trait, which represents an asynchronous function taking a request and returning either a response or an error. This abstraction can be used to model both clients and servers. +```rust +pub trait Service { + type Response; + type Error; + type Future: Future>; + + // Required methods + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>; + fn call(&mut self, req: Request) -> Self::Future; +} +``` +- Generic components, like timeout, rate limiting, and load balancing, can be modeled as Services that wrap some inner service and apply additional behavior before or after the inner service is called. This allows implementing these components in a protocol-agnostic, composable way. Typically, such services are referred to as middleware. +- An additional abstraction, the Layer trait, is used to compose middleware with Services. If a Service can be thought of as an asynchronous function from a request type to a response type, a Layer is a function taking a Service of one type and returning a Service of a different type. The ServiceBuilder type is used to add middleware to a service by composing it with multiple Layers. + +Here is an example of using tower service with timeout: +``` +use tokio::time::{sleep, Duration}; +use tower::{Service, ServiceBuilder}; +use std::task::{Context, Poll}; +use futures::future::{BoxFuture, FutureExt}; + +// Define a simple service that takes some time to respond +struct SlowService; + +impl Service for SlowService { + type Response = String; + type Error = &'static str; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: String) -> Self::Future { + println!("Processing request: {}", req); + + // Use an async block to create the future + async move { + // Simulate a slow response + sleep(Duration::from_secs(3)).await; + Ok(format!("Processed: {}", req)) + } + .boxed() // Convert the future into a BoxFuture + } +} + +#[tokio::main] +async fn main() { + // Create the base service + let service = SlowService; + + // Wrap the service with a timeout layer + let timeout_service = ServiceBuilder::new() + .timeout(Duration::from_secs(1)) + .service(service); + + // Use the timeout-wrapped service + let mut svc = timeout_service; + + match svc.call("Hello Tower!".to_string()).await { + Ok(response) => println!("Response: {}", response), + Err(err) => { + if let Some(_elapsed) = err.downcast_ref::() { + println!("Error: Timed out"); + } else { + println!("Error: {:?}", err); + } + } + } +} +``` + +## Actors in Tokio + +- An actor is a self-contained task (usually spawned by an async runtime like Tokio) that runs independently and communicates with other parts of the system by sending and receiving messages. + - Each actor encapsulates some resource or job. + - Other tasks/actors access it by sending messages rather than by directly sharing data. + +- An actor is split into two parts: + 1. The Task – A run function or method that processes incoming messages in a loop, shutting down gracefully when no more messages can be received. + 2. The Handle – A struct that owns a mpsc::Sender (or other channel) and exposes async methods to send messages to the actor. + +A minimal actor example might look like this: +``` +use tokio::sync::{oneshot, mpsc}; + +// Message type(s) the actor can receive. +enum ActorMessage { + GetUniqueId { + respond_to: oneshot::Sender, + }, +} + +// The actor’s internal state and logic. +struct MyActor { + receiver: mpsc::Receiver, + next_id: u32, +} + +impl MyActor { + fn new(receiver: mpsc::Receiver) -> Self { + Self { receiver, next_id: 0 } + } + + fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::GetUniqueId { respond_to } => { + self.next_id += 1; + let _ = respond_to.send(self.next_id); // Ignore send errors + } + } + } +} + +// The actor’s main event loop. +async fn run_my_actor(mut actor: MyActor) { + while let Some(msg) = actor.receiver.recv().await { + actor.handle_message(msg); + } +} + +// The "handle": provides methods to send messages to the actor. +#[derive(Clone)] +struct MyActorHandle { + sender: mpsc::Sender, +} + +impl MyActorHandle { + // Creates both the sender and the spawned actor task. + pub fn new() -> Self { + let (sender, receiver) = mpsc::channel(8); + let actor = MyActor::new(receiver); + tokio::spawn(run_my_actor(actor)); + + Self { sender } + } + + // Example request-response method. + pub async fn get_unique_id(&self) -> u32 { + let (send, recv) = oneshot::channel(); + let msg = ActorMessage::GetUniqueId { respond_to: send }; + let _ = self.sender.send(msg).await; // Sends the message to the actor + recv.await.expect("Actor task was killed") + } +} +``` From 509d3fe324b216e6808d96b40cc44fc58aaa47ea Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 11:51:55 -0500 Subject: [PATCH 03/12] stashing changes - compilation is okay, example does not run --- crates/mcp-client/examples/stdio.rs | 17 +++-- crates/mcp-client/examples/svc.rs | 4 +- crates/mcp-client/src/client.rs | 35 ++++++----- crates/mcp-client/src/lib.rs | 5 ++ crates/mcp-client/src/service.rs | 46 ++++++++++++++ crates/mcp-client/src/transport/mod.rs | 79 ++++++++++-------------- crates/mcp-client/src/transport/sse.rs | 20 +++++- crates/mcp-client/src/transport/stdio.rs | 20 +++++- 8 files changed, 150 insertions(+), 76 deletions(-) create mode 100644 crates/mcp-client/src/service.rs diff --git a/crates/mcp-client/examples/stdio.rs b/crates/mcp-client/examples/stdio.rs index 52d7ee141..4a202b4b2 100644 --- a/crates/mcp-client/examples/stdio.rs +++ b/crates/mcp-client/examples/stdio.rs @@ -1,6 +1,9 @@ use anyhow::Result; -use mcp_client::client::{ClientCapabilities, ClientInfo, Error as ClientError, McpClient}; -use mcp_client::transport::{StdioTransport, Transport}; +use mcp_client::{ + ClientCapabilities, ClientInfo, Error as ClientError, McpClient, McpService, StdioTransport, +}; +use std::time::Duration; +use tower::ServiceBuilder; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -20,8 +23,14 @@ async fn main() -> Result<(), ClientError> { // 2) Start the transport to get a handle let transport_handle = transport.start().await?; - // 3) Create the client - let mut client = McpClient::new(transport_handle); + // 3) Create the service with timeout middleware + let service = McpService::new(transport_handle); + let service = ServiceBuilder::new() + .timeout(Duration::from_secs(30)) + .service(service); + + // 4) Create the client with the middleware-wrapped service + let mut client = McpClient::new(service); // Initialize let server_info = client diff --git a/crates/mcp-client/examples/svc.rs b/crates/mcp-client/examples/svc.rs index cf0952e14..e889bc1af 100644 --- a/crates/mcp-client/examples/svc.rs +++ b/crates/mcp-client/examples/svc.rs @@ -1,7 +1,7 @@ +use futures::future::{BoxFuture, FutureExt}; +use std::task::{Context, Poll}; use tokio::time::{sleep, Duration}; use tower::{Service, ServiceBuilder}; -use std::task::{Context, Poll}; -use futures::future::{BoxFuture, FutureExt}; // Define a simple service that takes some time to respond struct SlowService; diff --git a/crates/mcp-client/src/client.rs b/crates/mcp-client/src/client.rs index 81e646108..0253117b9 100644 --- a/crates/mcp-client/src/client.rs +++ b/crates/mcp-client/src/client.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use thiserror::Error; use tokio::sync::Mutex; -use tower::{Service, ServiceExt}; // for Service::ready() +use tower::Service; /// Error type for MCP client operations. #[derive(Debug, Error)] @@ -55,19 +55,26 @@ pub struct InitializeParams { } /// The MCP client is the interface for MCP operations. -pub struct McpClient { - service: Mutex, +pub struct McpClient +where + S: Service + Clone + Send + 'static, + S::Error: Into, +{ + service: S, next_id: AtomicU64, server_capabilities: Option, } -impl McpClient { - pub fn new(transport_handle: TransportHandle) -> Self { - // Takes TransportHandle directly +impl McpClient +where + S: Service + Clone + Send + 'static, + S::Error: Into, +{ + pub fn new(service: S) -> Self { Self { - service: Mutex::new(transport_handle), + service, next_id: AtomicU64::new(1), - server_capabilities: None, // set during initialization + server_capabilities: None, } } @@ -76,9 +83,6 @@ impl McpClient { where R: for<'de> Deserialize<'de>, { - let mut service = self.service.lock().await; - service.ready().await.map_err(|_| Error::NotReady)?; - let id = self.next_id.fetch_add(1, Ordering::SeqCst); let request = JsonRpcMessage::Request(JsonRpcRequest { jsonrpc: "2.0".to_string(), @@ -87,7 +91,8 @@ impl McpClient { params: Some(params), }); - let response_msg = service.call(request).await?; + let mut service = self.service.clone(); + let response_msg = service.call(request).await.map_err(Into::into)?; match response_msg { JsonRpcMessage::Response(JsonRpcResponse { @@ -126,16 +131,14 @@ impl McpClient { /// Send a JSON-RPC notification. async fn send_notification(&self, method: &str, params: Value) -> Result<(), Error> { - let mut service = self.service.lock().await; - service.ready().await.map_err(|_| Error::NotReady)?; - let notification = JsonRpcMessage::Notification(JsonRpcNotification { jsonrpc: "2.0".to_string(), method: method.to_string(), params: Some(params), }); - service.call(notification).await?; + let mut service = self.service.clone(); + service.call(notification).await.map_err(Into::into)?; Ok(()) } diff --git a/crates/mcp-client/src/lib.rs b/crates/mcp-client/src/lib.rs index ca0e0203e..e1efb60ed 100644 --- a/crates/mcp-client/src/lib.rs +++ b/crates/mcp-client/src/lib.rs @@ -1,2 +1,7 @@ pub mod client; +pub mod service; pub mod transport; + +pub use client::{ClientCapabilities, ClientInfo, Error, McpClient}; +pub use service::McpService; +pub use transport::{SseTransport, StdioTransport, Transport, TransportHandle}; diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs new file mode 100644 index 000000000..a508b2b2c --- /dev/null +++ b/crates/mcp-client/src/service.rs @@ -0,0 +1,46 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::future::BoxFuture; +use mcp_core::protocol::JsonRpcMessage; +use tower::Service; + +use crate::transport::{Error, TransportHandle}; + +/// A wrapper service that implements Tower's Service trait for MCP transport +#[derive(Clone)] +pub struct McpService { + inner: T, +} + +impl McpService { + pub fn new(transport: T) -> Self { + Self { inner: transport } + } + + pub fn into_inner(self) -> T { + self.inner + } +} + +impl Service for McpService +where + T: TransportHandle, +{ + type Response = JsonRpcMessage; + type Error = Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Most transports are always ready, but this could be customized if needed + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: JsonRpcMessage) -> Self::Future { + let transport = self.inner.clone(); + Box::pin(async move { transport.send(request).await }) + } +} diff --git a/crates/mcp-client/src/transport/mod.rs b/crates/mcp-client/src/transport/mod.rs index a10c1c9f0..7e577193a 100644 --- a/crates/mcp-client/src/transport/mod.rs +++ b/crates/mcp-client/src/transport/mod.rs @@ -6,10 +6,10 @@ use std::{ }; use async_trait::async_trait; +use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; -use tower::Service; /// A generic error type for transport operations. #[derive(Debug, Error)] @@ -59,63 +59,46 @@ pub struct TransportMessage { /// A generic asynchronous transport trait with channel-based communication #[async_trait] -pub trait Transport: Send + Sync + 'static { +pub trait Transport { + type Handle: TransportHandle; + /// Start the transport and establish the underlying connection. /// Returns the transport handle for sending messages. - async fn start(&self) -> Result; + async fn start(&self) -> Result; /// Close the transport and free any resources. async fn close(&self) -> Result<(), Error>; } -#[derive(Clone)] -pub struct TransportHandle { - sender: mpsc::Sender, +#[async_trait] +pub trait TransportHandle: Send + Sync + Clone + 'static { + async fn send(&self, message: JsonRpcMessage) -> Result; } -impl TransportHandle { - pub async fn send(&self, message: JsonRpcMessage) -> Result { - match message { - JsonRpcMessage::Request(request) => { - let (respond_to, response) = oneshot::channel(); - let msg = TransportMessage { - message: JsonRpcMessage::Request(request), - response_tx: Some(respond_to), - }; - self.sender - .send(msg) - .await - .map_err(|_| Error::ChannelClosed)?; - Ok(response.await.map_err(|_| Error::ChannelClosed)??) - } - JsonRpcMessage::Notification(notification) => { - let msg = TransportMessage { - message: JsonRpcMessage::Notification(notification), - response_tx: None, - }; - self.sender - .send(msg) - .await - .map_err(|_| Error::ChannelClosed)?; - Ok(JsonRpcMessage::Nil) // Explicitly return None for notifications - } - _ => Err(Error::Other("Unsupported message type".to_string())), +// Helper function that contains the common send implementation +pub async fn send_message( + sender: &mpsc::Sender, + message: JsonRpcMessage, +) -> Result { + match message { + JsonRpcMessage::Request(request) => { + let (respond_to, response) = oneshot::channel(); + let msg = TransportMessage { + message: JsonRpcMessage::Request(request), + response_tx: Some(respond_to), + }; + sender.send(msg).await.map_err(|_| Error::ChannelClosed)?; + Ok(response.await.map_err(|_| Error::ChannelClosed)??) } - } -} - -impl Service for TransportHandle { - type Response = JsonRpcMessage; - type Error = Error; // Using Transport's Error directly - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, message: JsonRpcMessage) -> Self::Future { - let this = self.clone(); - Box::pin(async move { this.send(message).await }) + JsonRpcMessage::Notification(notification) => { + let msg = TransportMessage { + message: JsonRpcMessage::Notification(notification), + response_tx: None, + }; + sender.send(msg).await.map_err(|_| Error::ChannelClosed)?; + Ok(JsonRpcMessage::Nil) + } + _ => Err(Error::Other("Unsupported message type".to_string())), } } diff --git a/crates/mcp-client/src/transport/sse.rs b/crates/mcp-client/src/transport/sse.rs index 74e4ab844..6679f6322 100644 --- a/crates/mcp-client/src/transport/sse.rs +++ b/crates/mcp-client/src/transport/sse.rs @@ -9,7 +9,7 @@ use tokio::sync::{mpsc, RwLock}; use tokio::time::{timeout, Duration}; use tracing::warn; -use super::{Transport, TransportHandle}; +use super::{send_message, Transport, TransportHandle}; // Timeout for the endpoint discovery const ENDPOINT_TIMEOUT_SECS: u64 = 5; @@ -202,6 +202,18 @@ impl SseActor { } } +#[derive(Clone)] +pub struct SseTransportHandle { + sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl TransportHandle for SseTransportHandle { + async fn send(&self, message: JsonRpcMessage) -> Result { + send_message(&self.sender, message).await + } +} + #[derive(Clone)] pub struct SseTransport { sse_url: String, @@ -237,7 +249,9 @@ impl SseTransport { #[async_trait] impl Transport for SseTransport { - async fn start(&self) -> Result { + type Handle = SseTransportHandle; + + async fn start(&self) -> Result { // Create a channel for outgoing TransportMessages let (tx, rx) = mpsc::channel(32); @@ -262,7 +276,7 @@ impl Transport for SseTransport { ) .await { - Ok(_) => Ok(TransportHandle { sender: tx }), + Ok(_) => Ok(SseTransportHandle { sender: tx }), Err(e) => Err(Error::SseConnection(e.to_string())), } } diff --git a/crates/mcp-client/src/transport/stdio.rs b/crates/mcp-client/src/transport/stdio.rs index c983342bb..5185f1464 100644 --- a/crates/mcp-client/src/transport/stdio.rs +++ b/crates/mcp-client/src/transport/stdio.rs @@ -6,7 +6,7 @@ use mcp_core::protocol::JsonRpcMessage; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::mpsc; -use super::{Error, PendingRequests, Transport, TransportHandle, TransportMessage}; +use super::{send_message, Error, PendingRequests, Transport, TransportHandle, TransportMessage}; /// A `StdioTransport` uses a child process's stdin/stdout as a communication channel. /// @@ -100,6 +100,18 @@ impl StdioActor { } } +#[derive(Clone)] +pub struct StdioTransportHandle { + sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl TransportHandle for StdioTransportHandle { + async fn send(&self, message: JsonRpcMessage) -> Result { + send_message(&self.sender, message).await + } +} + pub struct StdioTransport { command: String, args: Vec, @@ -141,7 +153,9 @@ impl StdioTransport { #[async_trait] impl Transport for StdioTransport { - async fn start(&self) -> Result { + type Handle = StdioTransportHandle; + + async fn start(&self) -> Result { let (process, stdin, stdout) = self.spawn_process().await?; let (message_tx, message_rx) = mpsc::channel(32); @@ -155,7 +169,7 @@ impl Transport for StdioTransport { tokio::spawn(actor.run()); - let handle = TransportHandle { sender: message_tx }; + let handle = StdioTransportHandle { sender: message_tx }; Ok(handle) } From 74f32467cf6ea06ba2e5f89ff112164377772d93 Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 12:06:03 -0500 Subject: [PATCH 04/12] example stdio.rs is working --- crates/mcp-client/examples/stdio.rs | 7 ++----- crates/mcp-client/src/client.rs | 11 +++++++++-- crates/mcp-client/src/service.rs | 21 ++++++++++++++++++++- crates/mcp-client/src/transport/mod.rs | 9 +++++++++ 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/crates/mcp-client/examples/stdio.rs b/crates/mcp-client/examples/stdio.rs index 4a202b4b2..7ae844e32 100644 --- a/crates/mcp-client/examples/stdio.rs +++ b/crates/mcp-client/examples/stdio.rs @@ -1,9 +1,9 @@ use anyhow::Result; use mcp_client::{ ClientCapabilities, ClientInfo, Error as ClientError, McpClient, McpService, StdioTransport, + Transport, }; use std::time::Duration; -use tower::ServiceBuilder; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -24,10 +24,7 @@ async fn main() -> Result<(), ClientError> { let transport_handle = transport.start().await?; // 3) Create the service with timeout middleware - let service = McpService::new(transport_handle); - let service = ServiceBuilder::new() - .timeout(Duration::from_secs(30)) - .service(service); + let service = McpService::with_timeout(transport_handle, Duration::from_secs(30)); // 4) Create the client with the middleware-wrapped service let mut client = McpClient::new(service); diff --git a/crates/mcp-client/src/client.rs b/crates/mcp-client/src/client.rs index 0253117b9..3b1e60f23 100644 --- a/crates/mcp-client/src/client.rs +++ b/crates/mcp-client/src/client.rs @@ -1,6 +1,5 @@ use std::sync::atomic::{AtomicU64, Ordering}; -use crate::transport::TransportHandle; use mcp_core::protocol::{ CallToolResult, InitializeResult, JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, ListResourcesResult, ListToolsResult, ReadResourceResult, @@ -9,7 +8,6 @@ use mcp_core::protocol::{ use serde::{Deserialize, Serialize}; use serde_json::Value; use thiserror::Error; -use tokio::sync::Mutex; use tower::Service; /// Error type for MCP client operations. @@ -32,6 +30,15 @@ pub enum Error { #[error("Timeout or service not ready")] NotReady, + + #[error("Box error: {0}")] + BoxError(Box), +} + +impl From> for Error { + fn from(err: Box) -> Self { + Error::BoxError(err) + } } #[derive(Serialize, Deserialize)] diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs index a508b2b2c..b9a9611ae 100644 --- a/crates/mcp-client/src/service.rs +++ b/crates/mcp-client/src/service.rs @@ -6,7 +6,7 @@ use std::{ use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; -use tower::Service; +use tower::{timeout::Timeout, Service, ServiceBuilder}; use crate::transport::{Error, TransportHandle}; @@ -44,3 +44,22 @@ where Box::pin(async move { transport.send(request).await }) } } + +// Add a convenience constructor for creating a service with timeout +impl McpService +where + T: TransportHandle, +{ + pub fn with_timeout(transport: T, timeout: std::time::Duration) -> Timeout> { + ServiceBuilder::new() + .timeout(timeout) + .service(McpService::new(transport)) + } +} + +// Implement From for our Error type +impl From for Error { + fn from(_: tower::timeout::error::Elapsed) -> Self { + Error::Timeout + } +} diff --git a/crates/mcp-client/src/transport/mod.rs b/crates/mcp-client/src/transport/mod.rs index 7e577193a..997fe602d 100644 --- a/crates/mcp-client/src/transport/mod.rs +++ b/crates/mcp-client/src/transport/mod.rs @@ -46,6 +46,15 @@ pub enum Error { #[error("Unexpected transport error: {0}")] Other(String), + + #[error("Box error: {0}")] + BoxError(Box), +} + +impl From> for Error { + fn from(err: Box) -> Self { + Error::BoxError(err) + } } /// A message that can be sent through the transport From 2f84e9605b959757728fa97eb715a2eaa44e525d Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 12:19:26 -0500 Subject: [PATCH 05/12] stash changes. all examples work except examples/clients.rs --- crates/mcp-client/examples/clients.rs | 10 +++++++--- crates/mcp-client/examples/sse.rs | 6 +++++- crates/mcp-client/examples/stdio.rs | 2 +- crates/mcp-client/examples/stdio_integration.rs | 8 +++++++- crates/mcp-client/src/service.rs | 7 +------ crates/mcp-client/src/transport/mod.rs | 9 +-------- crates/mcp-server/src/main.rs | 2 +- 7 files changed, 23 insertions(+), 21 deletions(-) diff --git a/crates/mcp-client/examples/clients.rs b/crates/mcp-client/examples/clients.rs index 0f2080858..6607710fe 100644 --- a/crates/mcp-client/examples/clients.rs +++ b/crates/mcp-client/examples/clients.rs @@ -1,6 +1,6 @@ use mcp_client::{ client::{ClientCapabilities, ClientInfo, McpClient}, - transport::{SseTransport, StdioTransport, Transport}, + transport::{SseTransport, StdioTransport, Transport}, McpService, }; use rand::Rng; use rand::SeedableRng; @@ -123,7 +123,9 @@ async fn create_stdio_client( _version: &str, ) -> Result> { let transport = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()]); - Ok(McpClient::new(transport.start().await?)) + let handle = transport.start().await?; + let service = McpService::with_timeout(handle, Duration::from_secs(10)); + Ok(McpClient::new(service)) } async fn create_sse_client( @@ -131,5 +133,7 @@ async fn create_sse_client( _version: &str, ) -> Result> { let transport = SseTransport::new("http://localhost:8000/sse"); - Ok(McpClient::new(transport.start().await?)) + let handle = transport.start().await?; + let service = McpService::with_timeout(handle, Duration::from_secs(3)); + Ok(McpClient::new(service)) } diff --git a/crates/mcp-client/examples/sse.rs b/crates/mcp-client/examples/sse.rs index f6c79563c..093116ee5 100644 --- a/crates/mcp-client/examples/sse.rs +++ b/crates/mcp-client/examples/sse.rs @@ -1,6 +1,7 @@ use anyhow::Result; use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient}; use mcp_client::transport::{SseTransport, Transport}; +use mcp_client::McpService; use std::time::Duration; use tracing_subscriber::EnvFilter; @@ -21,8 +22,11 @@ async fn main() -> Result<()> { // Start transport let handle = transport.start().await?; + // Create the service with timeout middleware + let service = McpService::with_timeout(handle, Duration::from_secs(3)); + // Create client - let mut client = McpClient::new(handle); + let mut client = McpClient::new(service); println!("Client created\n"); // Initialize diff --git a/crates/mcp-client/examples/stdio.rs b/crates/mcp-client/examples/stdio.rs index 7ae844e32..e57ea10d0 100644 --- a/crates/mcp-client/examples/stdio.rs +++ b/crates/mcp-client/examples/stdio.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), ClientError> { let transport_handle = transport.start().await?; // 3) Create the service with timeout middleware - let service = McpService::with_timeout(transport_handle, Duration::from_secs(30)); + let service = McpService::with_timeout(transport_handle, Duration::from_secs(10)); // 4) Create the client with the middleware-wrapped service let mut client = McpClient::new(service); diff --git a/crates/mcp-client/examples/stdio_integration.rs b/crates/mcp-client/examples/stdio_integration.rs index 4b2da6cee..183dfcb9f 100644 --- a/crates/mcp-client/examples/stdio_integration.rs +++ b/crates/mcp-client/examples/stdio_integration.rs @@ -1,8 +1,11 @@ +use std::time::Duration; + // This example shows how to use the mcp-client crate to interact with a server that has a simple counter tool. // The server is started by running `cargo run -p mcp-server` in the root of the mcp-server crate. use anyhow::Result; use mcp_client::client::{ClientCapabilities, ClientInfo, Error as ClientError, McpClient}; use mcp_client::transport::{StdioTransport, Transport}; +use mcp_client::McpService; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -28,8 +31,11 @@ async fn main() -> Result<(), ClientError> { // Start the transport to get a handle let transport_handle = transport.start().await.unwrap(); + // Create the service with timeout middleware + let service = McpService::with_timeout(transport_handle, Duration::from_secs(10)); + // Create client - let mut client = McpClient::new(transport_handle); + let mut client = McpClient::new(service); // Initialize let server_info = client diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs index b9a9611ae..3dde15205 100644 --- a/crates/mcp-client/src/service.rs +++ b/crates/mcp-client/src/service.rs @@ -1,9 +1,4 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - +use std::task::{Context, Poll}; use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; use tower::{timeout::Timeout, Service, ServiceBuilder}; diff --git a/crates/mcp-client/src/transport/mod.rs b/crates/mcp-client/src/transport/mod.rs index 997fe602d..00ca52ba3 100644 --- a/crates/mcp-client/src/transport/mod.rs +++ b/crates/mcp-client/src/transport/mod.rs @@ -1,12 +1,5 @@ -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - +use std::collections::HashMap; use async_trait::async_trait; -use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; diff --git a/crates/mcp-server/src/main.rs b/crates/mcp-server/src/main.rs index aa9a9f93a..b89c44af7 100644 --- a/crates/mcp-server/src/main.rs +++ b/crates/mcp-server/src/main.rs @@ -58,7 +58,7 @@ impl Router for CounterRouter { } fn capabilities(&self) -> ServerCapabilities { - CapabilitiesBuilder::new().with_tools(true).build() + CapabilitiesBuilder::new().with_tools(false).with_resources(false, false).build() } fn list_tools(&self) -> Vec { From a2b2988c57a5b62c2a2696e5b7bd30adce1c0a47 Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 14:52:31 -0500 Subject: [PATCH 06/12] use McpClientTrait to make a collection of clients easier --- crates/mcp-client/examples/clients.rs | 46 ++++++++++-------------- crates/mcp-client/examples/sse.rs | 2 +- crates/mcp-client/examples/stdio.rs | 4 +-- crates/mcp-client/src/client.rs | 50 ++++++++++++++++++++------ crates/mcp-client/src/lib.rs | 2 +- crates/mcp-client/src/service.rs | 4 +-- crates/mcp-client/src/transport/mod.rs | 2 +- 7 files changed, 65 insertions(+), 45 deletions(-) diff --git a/crates/mcp-client/examples/clients.rs b/crates/mcp-client/examples/clients.rs index 6607710fe..a33e0a792 100644 --- a/crates/mcp-client/examples/clients.rs +++ b/crates/mcp-client/examples/clients.rs @@ -1,6 +1,7 @@ use mcp_client::{ - client::{ClientCapabilities, ClientInfo, McpClient}, - transport::{SseTransport, StdioTransport, Transport}, McpService, + client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait}, + transport::{SseTransport, StdioTransport, Transport}, + McpService, }; use rand::Rng; use rand::SeedableRng; @@ -16,14 +17,25 @@ async fn main() -> Result<(), Box> { EnvFilter::from_default_env().add_directive("mcp_client=debug".parse().unwrap()), ) .init(); + + let transport1 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()]); + let handle1 = transport1.start().await?; + let service1 = McpService::with_timeout(handle1, Duration::from_secs(30)); + let client1 = McpClient::new(service1); - // Create two separate clients with stdio transport - let client1 = create_stdio_client("client1", "1.0.0").await?; - let client2 = create_stdio_client("client2", "1.0.0").await?; - let client3 = create_sse_client("client3", "1.0.0").await?; + let transport2 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()]); + let handle2 = transport2.start().await?; + let service2 = McpService::with_timeout(handle2, Duration::from_secs(30)); + let client2 = McpClient::new(service2); + + let transport3 = SseTransport::new("http://localhost:8000/sse"); + let handle3 = transport3.start().await?; + let service3 = McpService::with_timeout(handle3, Duration::from_secs(3)); + let client3 = McpClient::new(service3); // Initialize both clients - let mut clients = vec![client1, client2, client3]; + let mut clients: Vec> = + vec![Box::new(client1), Box::new(client2), Box::new(client3)]; // Initialize all clients for (i, client) in clients.iter_mut().enumerate() { @@ -117,23 +129,3 @@ async fn main() -> Result<(), Box> { Ok(()) } - -async fn create_stdio_client( - _name: &str, - _version: &str, -) -> Result> { - let transport = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()]); - let handle = transport.start().await?; - let service = McpService::with_timeout(handle, Duration::from_secs(10)); - Ok(McpClient::new(service)) -} - -async fn create_sse_client( - _name: &str, - _version: &str, -) -> Result> { - let transport = SseTransport::new("http://localhost:8000/sse"); - let handle = transport.start().await?; - let service = McpService::with_timeout(handle, Duration::from_secs(3)); - Ok(McpClient::new(service)) -} diff --git a/crates/mcp-client/examples/sse.rs b/crates/mcp-client/examples/sse.rs index 093116ee5..65a553fd5 100644 --- a/crates/mcp-client/examples/sse.rs +++ b/crates/mcp-client/examples/sse.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient}; +use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait}; use mcp_client::transport::{SseTransport, Transport}; use mcp_client::McpService; use std::time::Duration; diff --git a/crates/mcp-client/examples/stdio.rs b/crates/mcp-client/examples/stdio.rs index e57ea10d0..be2e43806 100644 --- a/crates/mcp-client/examples/stdio.rs +++ b/crates/mcp-client/examples/stdio.rs @@ -1,7 +1,7 @@ use anyhow::Result; use mcp_client::{ - ClientCapabilities, ClientInfo, Error as ClientError, McpClient, McpService, StdioTransport, - Transport, + ClientCapabilities, ClientInfo, Error as ClientError, McpClient, McpClientTrait, McpService, + StdioTransport, Transport, }; use std::time::Duration; use tracing_subscriber::EnvFilter; diff --git a/crates/mcp-client/src/client.rs b/crates/mcp-client/src/client.rs index 3b1e60f23..58813e7e8 100644 --- a/crates/mcp-client/src/client.rs +++ b/crates/mcp-client/src/client.rs @@ -61,11 +61,29 @@ pub struct InitializeParams { pub client_info: ClientInfo, } +#[async_trait::async_trait] +pub trait McpClientTrait: Send + Sync { + async fn initialize( + &mut self, + info: ClientInfo, + capabilities: ClientCapabilities, + ) -> Result; + + async fn list_resources(&self) -> Result; + + async fn read_resource(&self, uri: &str) -> Result; + + async fn list_tools(&self) -> Result; + + async fn call_tool(&self, name: &str, arguments: Value) -> Result; +} + /// The MCP client is the interface for MCP operations. pub struct McpClient where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + Sync + 'static, S::Error: Into, + S::Future: Send, { service: S, next_id: AtomicU64, @@ -74,8 +92,9 @@ where impl McpClient where - S: Service + Clone + Send + 'static, + S: Service + Clone + Send + Sync + 'static, S::Error: Into, + S::Future: Send, { pub fn new(service: S) -> Self { Self { @@ -149,7 +168,20 @@ where Ok(()) } - pub async fn initialize( + // Check if the client has completed initialization + fn completed_initialization(&self) -> bool { + self.server_capabilities.is_some() + } +} + +#[async_trait::async_trait] +impl McpClientTrait for McpClient +where + S: Service + Clone + Send + Sync + 'static, + S::Error: Into, + S::Future: Send, +{ + async fn initialize( &mut self, info: ClientInfo, capabilities: ClientCapabilities, @@ -171,11 +203,7 @@ where Ok(result) } - fn completed_initialization(&self) -> bool { - self.server_capabilities.is_some() - } - - pub async fn list_resources(&self) -> Result { + async fn list_resources(&self) -> Result { if !self.completed_initialization() { return Err(Error::NotInitialized); } @@ -194,7 +222,7 @@ where .await } - pub async fn read_resource(&self, uri: &str) -> Result { + async fn read_resource(&self, uri: &str) -> Result { if !self.completed_initialization() { return Err(Error::NotInitialized); } @@ -216,7 +244,7 @@ where self.send_request("resources/read", params).await } - pub async fn list_tools(&self) -> Result { + async fn list_tools(&self) -> Result { if !self.completed_initialization() { return Err(Error::NotInitialized); } @@ -228,7 +256,7 @@ where self.send_request("tools/list", serde_json::json!({})).await } - pub async fn call_tool(&self, name: &str, arguments: Value) -> Result { + async fn call_tool(&self, name: &str, arguments: Value) -> Result { if !self.completed_initialization() { return Err(Error::NotInitialized); } diff --git a/crates/mcp-client/src/lib.rs b/crates/mcp-client/src/lib.rs index e1efb60ed..985d89d16 100644 --- a/crates/mcp-client/src/lib.rs +++ b/crates/mcp-client/src/lib.rs @@ -2,6 +2,6 @@ pub mod client; pub mod service; pub mod transport; -pub use client::{ClientCapabilities, ClientInfo, Error, McpClient}; +pub use client::{ClientCapabilities, ClientInfo, Error, McpClient, McpClientTrait}; pub use service::McpService; pub use transport::{SseTransport, StdioTransport, Transport, TransportHandle}; diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs index 3dde15205..946233d31 100644 --- a/crates/mcp-client/src/service.rs +++ b/crates/mcp-client/src/service.rs @@ -1,6 +1,6 @@ -use std::task::{Context, Poll}; use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; +use std::task::{Context, Poll}; use tower::{timeout::Timeout, Service, ServiceBuilder}; use crate::transport::{Error, TransportHandle}; @@ -23,7 +23,7 @@ impl McpService { impl Service for McpService where - T: TransportHandle, + T: TransportHandle + Send + Sync + 'static, { type Response = JsonRpcMessage; type Error = Error; diff --git a/crates/mcp-client/src/transport/mod.rs b/crates/mcp-client/src/transport/mod.rs index 00ca52ba3..aaf8a7f58 100644 --- a/crates/mcp-client/src/transport/mod.rs +++ b/crates/mcp-client/src/transport/mod.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use async_trait::async_trait; use mcp_core::protocol::JsonRpcMessage; +use std::collections::HashMap; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; From 42f14290c36ff48457122b7c04cba840c2bb761d Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 14:53:49 -0500 Subject: [PATCH 07/12] revert goosehints and cursorrules --- .cursorrules | 276 ++------------------------------------------------- .goosehints | 275 ++------------------------------------------------ 2 files changed, 12 insertions(+), 539 deletions(-) diff --git a/.cursorrules b/.cursorrules index 22b7abb48..4eaa69991 100644 --- a/.cursorrules +++ b/.cursorrules @@ -1,86 +1,8 @@ -Anthropic just released Model Context Protocol (MCP), a new standard for connecting AI assistants to the systems where data lives, including content repositories, business tools, and development environments. The Model Context Protocol is an open standard that enables developers to build secure, two-way connections between their data sources and AI-powered tools. The architecture is straightforward: developers can either expose their data through MCP servers or build AI applications (MCP clients) that connect to these servers. - -Model Context Protocol (MCP), is heavily inspired by Microsoft's Language Server Protocol (LSP) used in IDEs. - -# MCP Protocol - -## Lifecycle - -The Model Context Protocol (MCP) defines a rigorous lifecycle for client-server connections that ensures proper capability negotiation and state management. - -3 phases: -1. Initialization: Capability negotiation and protocol version agreement -2. Operation: Normal protocol communication -3. Shutdown: Graceful termination of the connection - - -### Initialization -The initialization phase MUST be the first interaction between client and server. During this phase, the client and server: -- Establish protocol version compatibility -- Exchange and negotiate capabilities -- Share implementation details - -The client MUST initiate this phase by sending an initialize request containing: -- Protocol version supported -- Client capabilities -- Client implementation information - -### Operation -During the operation phase, the client and server exchange messages according to the negotiated capabilities. - -Both parties SHOULD: -- Respect the negotiated protocol version -- Only use capabilities that were successfully negotiated - -### Shutdown -During the shutdown phase, one side (usually the client) cleanly terminates the protocol connection. No specific shutdown messages are defined—instead, the underlying transport mechanism should be used to signal connection termination: - -#### stdio -For the stdio transport, the client SHOULD initiate shutdown by: - -1. First, closing the input stream to the child process (the server) -2. Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time -3. Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM - -The server MAY initiate shutdown by closing its output stream to the client and exiting. - -#### HTTP with SSE - -For HTTP transports, shutdown is indicated by closing the associated HTTP connection(s). - - -## Transports - -2 standard transport mechanisms for client-server communication: - -1. stdio, communication over standard in and standard out -2. HTTP with Server-Sent Events (SSE) - -### stdio - -- The client launches the MCP server as a subprocess. -- The server receives JSON-RPC messages on its standard input (stdin) and writes responses to its standard output (stdout). -- Messages are delimited by newlines, and MUST NOT contain embedded newlines. -- The server MAY write UTF-8 strings to its standard error (stderr) for logging purposes. Clients MAY capture, forward, or ignore this logging. -- The server MUST NOT write anything to its stdout that is not a valid MCP message. -- The client MUST NOT write anything to the server’s stdin that is not a valid MCP message. - -### HTTP with SSE - -In the SSE transport, the server operates as an independent process that can handle multiple client connections. - -The server MUST provide two endpoints: -1. An SSE endpoint, for clients to establish a connection and receive messages from the server -2. A regular HTTP POST endpoint for clients to send messages to the server - -When a client connects, the server MUST send an endpoint event containing a URI for the client to use for sending messages. All subsequent client messages MUST be sent as HTTP POST requests to this endpoint. - -Server messages are sent as SSE message events, with the message content encoded as JSON in the event data. - ------------------- - -# General Guidelines for writing Rust code +You are an expert programmer in Rust teaching who is teaching another developer who is learning Rust. +The students are familiar with programming in languages such as Python (advanced), Java (novice) and C (novice) so +when possible use analogies from those languages. +Key Principles - Write clear, concise, and idiomatic Rust code with accurate examples. - Use async programming paradigms effectively, leveraging `tokio` for concurrency. - Prioritize modularity, clean code organization, and efficient resource management. @@ -89,21 +11,6 @@ Server messages are sent as SSE message events, with the message content encoded - Avoid code duplication; use functions and modules to encapsulate reusable logic. - Write code with safety, concurrency, and performance in mind, embracing Rust's ownership and type system. -Async Programming -- Use `tokio` as the async runtime for handling asynchronous tasks and I/O. -- Implement async functions using `async fn` syntax. -- Leverage `tokio::spawn` for task spawning and concurrency. -- Use `tokio::select!` for managing multiple async tasks and cancellations. -- Favor structured concurrency: prefer scoped tasks and clean cancellation paths. -- Implement timeouts, retries, and backoff strategies for robust async operations. - -Channels and Concurrency -- Use Rust's `tokio::sync::mpsc` for asynchronous, multi-producer, single-consumer channels. -- Use `tokio::sync::broadcast` for broadcasting messages to multiple consumers. -- Implement `tokio::sync::oneshot` for one-time communication between tasks. -- Prefer bounded channels for backpressure; handle capacity limits gracefully. -- Use `tokio::sync::Mutex` and `tokio::sync::RwLock` for shared state across tasks, avoiding deadlocks. - Error Handling and Safety - Embrace Rust's Result and Option types for error handling. - Use `?` operator to propagate errors in async functions. @@ -111,181 +18,10 @@ Error Handling and Safety - Handle errors and edge cases early, returning errors where appropriate. - Use `.await` responsibly, ensuring safe points for context switching. -Testing -- Write unit tests with `tokio::test` for async tests. -- Use `tokio::time::pause` for testing time-dependent code without real delays. -- Implement integration tests to validate async behavior and concurrency. -- Use mocks and fakes for external dependencies in tests. - -Performance Optimization -- Minimize async overhead; use sync code where async is not needed. -- Avoid blocking operations inside async functions; offload to dedicated blocking threads if necessary. -- Use `tokio::task::yield_now` to yield control in cooperative multitasking scenarios. -- Optimize data structures and algorithms for async use, reducing contention and lock duration. -- Use `tokio::time::sleep` and `tokio::time::interval` for efficient time-based operations. - Key Conventions 1. Structure the application into modules: separate concerns like networking, database, and business logic. 2. Use environment variables for configuration management (e.g., `dotenv` crate). 3. Ensure code is well-documented with inline comments and Rustdoc. +4. Do not use the older style of "MOD/mod.rs" for separing modules and instead use the "MOD.rs" filename convention. -Async Ecosystem -- Use `tokio` for async runtime and task management. -- Leverage `hyper` or `reqwest` for async HTTP requests. -- Use `serde` for serialization/deserialization. - - -## Tower crate - -- Tower is a library of modular and reusable components for building networking clients and servers. -- Tower provides a simple core abstraction, the Service trait, which represents an asynchronous function taking a request and returning either a response or an error. This abstraction can be used to model both clients and servers. -```rust -pub trait Service { - type Response; - type Error; - type Future: Future>; - - // Required methods - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>; - fn call(&mut self, req: Request) -> Self::Future; -} -``` -- Generic components, like timeout, rate limiting, and load balancing, can be modeled as Services that wrap some inner service and apply additional behavior before or after the inner service is called. This allows implementing these components in a protocol-agnostic, composable way. Typically, such services are referred to as middleware. -- An additional abstraction, the Layer trait, is used to compose middleware with Services. If a Service can be thought of as an asynchronous function from a request type to a response type, a Layer is a function taking a Service of one type and returning a Service of a different type. The ServiceBuilder type is used to add middleware to a service by composing it with multiple Layers. - -Here is an example of using tower service with timeout: -``` -use tokio::time::{sleep, Duration}; -use tower::{Service, ServiceBuilder}; -use std::task::{Context, Poll}; -use futures::future::{BoxFuture, FutureExt}; - -// Define a simple service that takes some time to respond -struct SlowService; - -impl Service for SlowService { - type Response = String; - type Error = &'static str; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: String) -> Self::Future { - println!("Processing request: {}", req); - - // Use an async block to create the future - async move { - // Simulate a slow response - sleep(Duration::from_secs(3)).await; - Ok(format!("Processed: {}", req)) - } - .boxed() // Convert the future into a BoxFuture - } -} - -#[tokio::main] -async fn main() { - // Create the base service - let service = SlowService; - - // Wrap the service with a timeout layer - let timeout_service = ServiceBuilder::new() - .timeout(Duration::from_secs(1)) - .service(service); - - // Use the timeout-wrapped service - let mut svc = timeout_service; - - match svc.call("Hello Tower!".to_string()).await { - Ok(response) => println!("Response: {}", response), - Err(err) => { - if let Some(_elapsed) = err.downcast_ref::() { - println!("Error: Timed out"); - } else { - println!("Error: {:?}", err); - } - } - } -} -``` - -## Actors in Tokio - -- An actor is a self-contained task (usually spawned by an async runtime like Tokio) that runs independently and communicates with other parts of the system by sending and receiving messages. - - Each actor encapsulates some resource or job. - - Other tasks/actors access it by sending messages rather than by directly sharing data. - -- An actor is split into two parts: - 1. The Task – A run function or method that processes incoming messages in a loop, shutting down gracefully when no more messages can be received. - 2. The Handle – A struct that owns a mpsc::Sender (or other channel) and exposes async methods to send messages to the actor. - -A minimal actor example might look like this: -``` -use tokio::sync::{oneshot, mpsc}; - -// Message type(s) the actor can receive. -enum ActorMessage { - GetUniqueId { - respond_to: oneshot::Sender, - }, -} - -// The actor’s internal state and logic. -struct MyActor { - receiver: mpsc::Receiver, - next_id: u32, -} - -impl MyActor { - fn new(receiver: mpsc::Receiver) -> Self { - Self { receiver, next_id: 0 } - } - - fn handle_message(&mut self, msg: ActorMessage) { - match msg { - ActorMessage::GetUniqueId { respond_to } => { - self.next_id += 1; - let _ = respond_to.send(self.next_id); // Ignore send errors - } - } - } -} - -// The actor’s main event loop. -async fn run_my_actor(mut actor: MyActor) { - while let Some(msg) = actor.receiver.recv().await { - actor.handle_message(msg); - } -} - -// The "handle": provides methods to send messages to the actor. -#[derive(Clone)] -struct MyActorHandle { - sender: mpsc::Sender, -} - -impl MyActorHandle { - // Creates both the sender and the spawned actor task. - pub fn new() -> Self { - let (sender, receiver) = mpsc::channel(8); - let actor = MyActor::new(receiver); - tokio::spawn(run_my_actor(actor)); - - Self { sender } - } - - // Example request-response method. - pub async fn get_unique_id(&self) -> u32 { - let (send, recv) = oneshot::channel(); - let msg = ActorMessage::GetUniqueId { respond_to: send }; - let _ = self.sender.send(msg).await; // Sends the message to the actor - recv.await.expect("Actor task was killed") - } -} -``` - +Refer to "The Rust Programming Language" book (2024 version) and "Command line apps in Rust" documentation for in-depth information on best practices, and advanced features. diff --git a/.goosehints b/.goosehints index 1a6e48eaa..4eaa69991 100644 --- a/.goosehints +++ b/.goosehints @@ -1,86 +1,8 @@ -Anthropic just released Model Context Protocol (MCP), a new standard for connecting AI assistants to the systems where data lives, including content repositories, business tools, and development environments. The Model Context Protocol is an open standard that enables developers to build secure, two-way connections between their data sources and AI-powered tools. The architecture is straightforward: developers can either expose their data through MCP servers or build AI applications (MCP clients) that connect to these servers. - -Model Context Protocol (MCP), is heavily inspired by Microsoft's Language Server Protocol (LSP) used in IDEs. - -# MCP Protocol - -## Lifecycle - -The Model Context Protocol (MCP) defines a rigorous lifecycle for client-server connections that ensures proper capability negotiation and state management. - -3 phases: -1. Initialization: Capability negotiation and protocol version agreement -2. Operation: Normal protocol communication -3. Shutdown: Graceful termination of the connection - - -### Initialization -The initialization phase MUST be the first interaction between client and server. During this phase, the client and server: -- Establish protocol version compatibility -- Exchange and negotiate capabilities -- Share implementation details - -The client MUST initiate this phase by sending an initialize request containing: -- Protocol version supported -- Client capabilities -- Client implementation information - -### Operation -During the operation phase, the client and server exchange messages according to the negotiated capabilities. - -Both parties SHOULD: -- Respect the negotiated protocol version -- Only use capabilities that were successfully negotiated - -### Shutdown -During the shutdown phase, one side (usually the client) cleanly terminates the protocol connection. No specific shutdown messages are defined—instead, the underlying transport mechanism should be used to signal connection termination: - -#### stdio -For the stdio transport, the client SHOULD initiate shutdown by: - -1. First, closing the input stream to the child process (the server) -2. Waiting for the server to exit, or sending SIGTERM if the server does not exit within a reasonable time -3. Sending SIGKILL if the server does not exit within a reasonable time after SIGTERM - -The server MAY initiate shutdown by closing its output stream to the client and exiting. - -#### HTTP with SSE - -For HTTP transports, shutdown is indicated by closing the associated HTTP connection(s). - - -## Transports - -2 standard transport mechanisms for client-server communication: - -1. stdio, communication over standard in and standard out -2. HTTP with Server-Sent Events (SSE) - -### stdio - -- The client launches the MCP server as a subprocess. -- The server receives JSON-RPC messages on its standard input (stdin) and writes responses to its standard output (stdout). -- Messages are delimited by newlines, and MUST NOT contain embedded newlines. -- The server MAY write UTF-8 strings to its standard error (stderr) for logging purposes. Clients MAY capture, forward, or ignore this logging. -- The server MUST NOT write anything to its stdout that is not a valid MCP message. -- The client MUST NOT write anything to the server’s stdin that is not a valid MCP message. - -### HTTP with SSE - -In the SSE transport, the server operates as an independent process that can handle multiple client connections. - -The server MUST provide two endpoints: -1. An SSE endpoint, for clients to establish a connection and receive messages from the server -2. A regular HTTP POST endpoint for clients to send messages to the server - -When a client connects, the server MUST send an endpoint event containing a URI for the client to use for sending messages. All subsequent client messages MUST be sent as HTTP POST requests to this endpoint. - -Server messages are sent as SSE message events, with the message content encoded as JSON in the event data. - ------------------- - -# General Guidelines for writing Rust code +You are an expert programmer in Rust teaching who is teaching another developer who is learning Rust. +The students are familiar with programming in languages such as Python (advanced), Java (novice) and C (novice) so +when possible use analogies from those languages. +Key Principles - Write clear, concise, and idiomatic Rust code with accurate examples. - Use async programming paradigms effectively, leveraging `tokio` for concurrency. - Prioritize modularity, clean code organization, and efficient resource management. @@ -89,21 +11,6 @@ Server messages are sent as SSE message events, with the message content encoded - Avoid code duplication; use functions and modules to encapsulate reusable logic. - Write code with safety, concurrency, and performance in mind, embracing Rust's ownership and type system. -Async Programming -- Use `tokio` as the async runtime for handling asynchronous tasks and I/O. -- Implement async functions using `async fn` syntax. -- Leverage `tokio::spawn` for task spawning and concurrency. -- Use `tokio::select!` for managing multiple async tasks and cancellations. -- Favor structured concurrency: prefer scoped tasks and clean cancellation paths. -- Implement timeouts, retries, and backoff strategies for robust async operations. - -Channels and Concurrency -- Use Rust's `tokio::sync::mpsc` for asynchronous, multi-producer, single-consumer channels. -- Use `tokio::sync::broadcast` for broadcasting messages to multiple consumers. -- Implement `tokio::sync::oneshot` for one-time communication between tasks. -- Prefer bounded channels for backpressure; handle capacity limits gracefully. -- Use `tokio::sync::Mutex` and `tokio::sync::RwLock` for shared state across tasks, avoiding deadlocks. - Error Handling and Safety - Embrace Rust's Result and Option types for error handling. - Use `?` operator to propagate errors in async functions. @@ -111,180 +18,10 @@ Error Handling and Safety - Handle errors and edge cases early, returning errors where appropriate. - Use `.await` responsibly, ensuring safe points for context switching. -Testing -- Write unit tests with `tokio::test` for async tests. -- Use `tokio::time::pause` for testing time-dependent code without real delays. -- Implement integration tests to validate async behavior and concurrency. -- Use mocks and fakes for external dependencies in tests. - -Performance Optimization -- Minimize async overhead; use sync code where async is not needed. -- Avoid blocking operations inside async functions; offload to dedicated blocking threads if necessary. -- Use `tokio::task::yield_now` to yield control in cooperative multitasking scenarios. -- Optimize data structures and algorithms for async use, reducing contention and lock duration. -- Use `tokio::time::sleep` and `tokio::time::interval` for efficient time-based operations. - Key Conventions 1. Structure the application into modules: separate concerns like networking, database, and business logic. 2. Use environment variables for configuration management (e.g., `dotenv` crate). 3. Ensure code is well-documented with inline comments and Rustdoc. +4. Do not use the older style of "MOD/mod.rs" for separing modules and instead use the "MOD.rs" filename convention. -Async Ecosystem -- Use `tokio` for async runtime and task management. -- Leverage `hyper` or `reqwest` for async HTTP requests. -- Use `serde` for serialization/deserialization. - - -## Tower crate - -- Tower is a library of modular and reusable components for building networking clients and servers. -- Tower provides a simple core abstraction, the Service trait, which represents an asynchronous function taking a request and returning either a response or an error. This abstraction can be used to model both clients and servers. -```rust -pub trait Service { - type Response; - type Error; - type Future: Future>; - - // Required methods - fn poll_ready( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>; - fn call(&mut self, req: Request) -> Self::Future; -} -``` -- Generic components, like timeout, rate limiting, and load balancing, can be modeled as Services that wrap some inner service and apply additional behavior before or after the inner service is called. This allows implementing these components in a protocol-agnostic, composable way. Typically, such services are referred to as middleware. -- An additional abstraction, the Layer trait, is used to compose middleware with Services. If a Service can be thought of as an asynchronous function from a request type to a response type, a Layer is a function taking a Service of one type and returning a Service of a different type. The ServiceBuilder type is used to add middleware to a service by composing it with multiple Layers. - -Here is an example of using tower service with timeout: -``` -use tokio::time::{sleep, Duration}; -use tower::{Service, ServiceBuilder}; -use std::task::{Context, Poll}; -use futures::future::{BoxFuture, FutureExt}; - -// Define a simple service that takes some time to respond -struct SlowService; - -impl Service for SlowService { - type Response = String; - type Error = &'static str; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: String) -> Self::Future { - println!("Processing request: {}", req); - - // Use an async block to create the future - async move { - // Simulate a slow response - sleep(Duration::from_secs(3)).await; - Ok(format!("Processed: {}", req)) - } - .boxed() // Convert the future into a BoxFuture - } -} - -#[tokio::main] -async fn main() { - // Create the base service - let service = SlowService; - - // Wrap the service with a timeout layer - let timeout_service = ServiceBuilder::new() - .timeout(Duration::from_secs(1)) - .service(service); - - // Use the timeout-wrapped service - let mut svc = timeout_service; - - match svc.call("Hello Tower!".to_string()).await { - Ok(response) => println!("Response: {}", response), - Err(err) => { - if let Some(_elapsed) = err.downcast_ref::() { - println!("Error: Timed out"); - } else { - println!("Error: {:?}", err); - } - } - } -} -``` - -## Actors in Tokio - -- An actor is a self-contained task (usually spawned by an async runtime like Tokio) that runs independently and communicates with other parts of the system by sending and receiving messages. - - Each actor encapsulates some resource or job. - - Other tasks/actors access it by sending messages rather than by directly sharing data. - -- An actor is split into two parts: - 1. The Task – A run function or method that processes incoming messages in a loop, shutting down gracefully when no more messages can be received. - 2. The Handle – A struct that owns a mpsc::Sender (or other channel) and exposes async methods to send messages to the actor. - -A minimal actor example might look like this: -``` -use tokio::sync::{oneshot, mpsc}; - -// Message type(s) the actor can receive. -enum ActorMessage { - GetUniqueId { - respond_to: oneshot::Sender, - }, -} - -// The actor’s internal state and logic. -struct MyActor { - receiver: mpsc::Receiver, - next_id: u32, -} - -impl MyActor { - fn new(receiver: mpsc::Receiver) -> Self { - Self { receiver, next_id: 0 } - } - - fn handle_message(&mut self, msg: ActorMessage) { - match msg { - ActorMessage::GetUniqueId { respond_to } => { - self.next_id += 1; - let _ = respond_to.send(self.next_id); // Ignore send errors - } - } - } -} - -// The actor’s main event loop. -async fn run_my_actor(mut actor: MyActor) { - while let Some(msg) = actor.receiver.recv().await { - actor.handle_message(msg); - } -} - -// The "handle": provides methods to send messages to the actor. -#[derive(Clone)] -struct MyActorHandle { - sender: mpsc::Sender, -} - -impl MyActorHandle { - // Creates both the sender and the spawned actor task. - pub fn new() -> Self { - let (sender, receiver) = mpsc::channel(8); - let actor = MyActor::new(receiver); - tokio::spawn(run_my_actor(actor)); - - Self { sender } - } - - // Example request-response method. - pub async fn get_unique_id(&self) -> u32 { - let (send, recv) = oneshot::channel(); - let msg = ActorMessage::GetUniqueId { respond_to: send }; - let _ = self.sender.send(msg).await; // Sends the message to the actor - recv.await.expect("Actor task was killed") - } -} -``` +Refer to "The Rust Programming Language" book (2024 version) and "Command line apps in Rust" documentation for in-depth information on best practices, and advanced features. From f6b874ca63b12d0c43d9a9a30e31479dc165d81a Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 17:00:02 -0500 Subject: [PATCH 08/12] service needs to be wrapped in Mutex --- crates/mcp-client/examples/clients.rs | 4 +- .../mcp-client/examples/stdio_integration.rs | 4 +- crates/mcp-client/examples/svc.rs | 54 ------------------- crates/mcp-client/src/client.rs | 36 ++++++++----- crates/mcp-client/src/service.rs | 15 +++--- 5 files changed, 35 insertions(+), 78 deletions(-) delete mode 100644 crates/mcp-client/examples/svc.rs diff --git a/crates/mcp-client/examples/clients.rs b/crates/mcp-client/examples/clients.rs index a33e0a792..e3951a66b 100644 --- a/crates/mcp-client/examples/clients.rs +++ b/crates/mcp-client/examples/clients.rs @@ -17,7 +17,7 @@ async fn main() -> Result<(), Box> { EnvFilter::from_default_env().add_directive("mcp_client=debug".parse().unwrap()), ) .init(); - + let transport1 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()]); let handle1 = transport1.start().await?; let service1 = McpService::with_timeout(handle1, Duration::from_secs(30)); @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { let transport3 = SseTransport::new("http://localhost:8000/sse"); let handle3 = transport3.start().await?; - let service3 = McpService::with_timeout(handle3, Duration::from_secs(3)); + let service3 = McpService::with_timeout(handle3, Duration::from_secs(10)); let client3 = McpClient::new(service3); // Initialize both clients diff --git a/crates/mcp-client/examples/stdio_integration.rs b/crates/mcp-client/examples/stdio_integration.rs index 183dfcb9f..0b51b1812 100644 --- a/crates/mcp-client/examples/stdio_integration.rs +++ b/crates/mcp-client/examples/stdio_integration.rs @@ -3,7 +3,9 @@ use std::time::Duration; // This example shows how to use the mcp-client crate to interact with a server that has a simple counter tool. // The server is started by running `cargo run -p mcp-server` in the root of the mcp-server crate. use anyhow::Result; -use mcp_client::client::{ClientCapabilities, ClientInfo, Error as ClientError, McpClient}; +use mcp_client::client::{ + ClientCapabilities, ClientInfo, Error as ClientError, McpClient, McpClientTrait, +}; use mcp_client::transport::{StdioTransport, Transport}; use mcp_client::McpService; use tracing_subscriber::EnvFilter; diff --git a/crates/mcp-client/examples/svc.rs b/crates/mcp-client/examples/svc.rs deleted file mode 100644 index e889bc1af..000000000 --- a/crates/mcp-client/examples/svc.rs +++ /dev/null @@ -1,54 +0,0 @@ -use futures::future::{BoxFuture, FutureExt}; -use std::task::{Context, Poll}; -use tokio::time::{sleep, Duration}; -use tower::{Service, ServiceBuilder}; - -// Define a simple service that takes some time to respond -struct SlowService; - -impl Service for SlowService { - type Response = String; - type Error = &'static str; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: String) -> Self::Future { - println!("Processing request: {}", req); - - // Use an async block to create the future - async move { - // Simulate a slow response - sleep(Duration::from_secs(3)).await; - Ok(format!("Processed: {}", req)) - } - .boxed() // Convert the future into a BoxFuture - } -} - -#[tokio::main] -async fn main() { - // Create the base service - let service = SlowService; - - // Wrap the service with a timeout layer - let timeout_service = ServiceBuilder::new() - .timeout(Duration::from_secs(1)) - .service(service); - - // Use the timeout-wrapped service - let mut svc = timeout_service; - - match svc.call("Hello Tower!".to_string()).await { - Ok(response) => println!("Response: {}", response), - Err(err) => { - if let Some(_elapsed) = err.downcast_ref::() { - println!("Error: Timed out"); - } else { - println!("Error: {:?}", err); - } - } - } -} diff --git a/crates/mcp-client/src/client.rs b/crates/mcp-client/src/client.rs index 58813e7e8..796b7884d 100644 --- a/crates/mcp-client/src/client.rs +++ b/crates/mcp-client/src/client.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::{AtomicU64, Ordering}; - use mcp_core::protocol::{ CallToolResult, InitializeResult, JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, ListResourcesResult, ListToolsResult, ReadResourceResult, @@ -7,8 +5,10 @@ use mcp_core::protocol::{ }; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::sync::atomic::{AtomicU64, Ordering}; use thiserror::Error; -use tower::Service; +use tokio::sync::Mutex; +use tower::{Service, ServiceExt}; // for Service::ready() /// Error type for MCP client operations. #[derive(Debug, Error)] @@ -22,8 +22,8 @@ pub enum Error { #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), - #[error("Unexpected response from server")] - UnexpectedResponse, + #[error("Unexpected response from server: {0}")] + UnexpectedResponse(String), #[error("Not initialized")] NotInitialized, @@ -85,7 +85,7 @@ where S::Error: Into, S::Future: Send, { - service: S, + service: Mutex, next_id: AtomicU64, server_capabilities: Option, } @@ -98,7 +98,7 @@ where { pub fn new(service: S) -> Self { Self { - service, + service: Mutex::new(service), next_id: AtomicU64::new(1), server_capabilities: None, } @@ -109,6 +109,9 @@ where where R: for<'de> Deserialize<'de>, { + let mut service = self.service.lock().await; + service.ready().await.map_err(|_| Error::NotReady)?; + let id = self.next_id.fetch_add(1, Ordering::SeqCst); let request = JsonRpcMessage::Request(JsonRpcRequest { jsonrpc: "2.0".to_string(), @@ -117,7 +120,6 @@ where params: Some(params), }); - let mut service = self.service.clone(); let response_msg = service.call(request).await.map_err(Into::into)?; match response_msg { @@ -126,7 +128,9 @@ where }) => { // Verify id matches if id != Some(self.next_id.load(Ordering::SeqCst) - 1) { - return Err(Error::UnexpectedResponse); + return Err(Error::UnexpectedResponse( + "id mismatch for JsonRpcResponse".to_string(), + )); } if let Some(err) = error { Err(Error::RpcError { @@ -136,12 +140,14 @@ where } else if let Some(r) = result { Ok(serde_json::from_value(r)?) } else { - Err(Error::UnexpectedResponse) + Err(Error::UnexpectedResponse("missing result".to_string())) } } JsonRpcMessage::Error(JsonRpcError { id, error, .. }) => { if id != Some(self.next_id.load(Ordering::SeqCst) - 1) { - return Err(Error::UnexpectedResponse); + return Err(Error::UnexpectedResponse( + "id mismatch for JsonRpcError".to_string(), + )); } Err(Error::RpcError { code: error.code, @@ -150,20 +156,24 @@ where } _ => { // Requests/notifications not expected as a response - Err(Error::UnexpectedResponse) + Err(Error::UnexpectedResponse( + "unexpected message type".to_string(), + )) } } } /// Send a JSON-RPC notification. async fn send_notification(&self, method: &str, params: Value) -> Result<(), Error> { + let mut service = self.service.lock().await; + service.ready().await.map_err(|_| Error::NotReady)?; + let notification = JsonRpcMessage::Notification(JsonRpcNotification { jsonrpc: "2.0".to_string(), method: method.to_string(), params: Some(params), }); - let mut service = self.service.clone(); service.call(notification).await.map_err(Into::into)?; Ok(()) } diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs index 946233d31..3e6b7afc5 100644 --- a/crates/mcp-client/src/service.rs +++ b/crates/mcp-client/src/service.rs @@ -1,5 +1,6 @@ use futures::future::BoxFuture; use mcp_core::protocol::JsonRpcMessage; +use std::sync::Arc; use std::task::{Context, Poll}; use tower::{timeout::Timeout, Service, ServiceBuilder}; @@ -7,17 +8,15 @@ use crate::transport::{Error, TransportHandle}; /// A wrapper service that implements Tower's Service trait for MCP transport #[derive(Clone)] -pub struct McpService { - inner: T, +pub struct McpService { + inner: Arc, } -impl McpService { +impl McpService { pub fn new(transport: T) -> Self { - Self { inner: transport } - } - - pub fn into_inner(self) -> T { - self.inner + Self { + inner: Arc::new(transport), + } } } From 646d61d0f92faaabb612c5d9240ee2ddc19b4485 Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 17:14:47 -0500 Subject: [PATCH 09/12] update goose agent --- crates/goose/src/agents/capabilities.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/goose/src/agents/capabilities.rs b/crates/goose/src/agents/capabilities.rs index 4007896ac..f50ac3e07 100644 --- a/crates/goose/src/agents/capabilities.rs +++ b/crates/goose/src/agents/capabilities.rs @@ -1,15 +1,17 @@ use chrono::{DateTime, TimeZone, Utc}; +use mcp_client::McpService; use rust_decimal_macros::dec; use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; +use std::time::Duration; use tokio::sync::Mutex; use tracing::{debug, instrument}; use super::system::{SystemConfig, SystemError, SystemInfo, SystemResult}; use crate::prompt_template::load_prompt_file; use crate::providers::base::{Provider, ProviderUsage}; -use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient}; +use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait}; use mcp_client::transport::{SseTransport, StdioTransport, Transport}; use mcp_core::{Content, Tool, ToolCall, ToolError, ToolResult}; @@ -20,7 +22,7 @@ static DEFAULT_TIMESTAMP: LazyLock> = /// Manages MCP clients and their interactions pub struct Capabilities { - clients: HashMap>>, + clients: HashMap>>>, instructions: HashMap, provider: Box, provider_usage: Mutex>, @@ -87,14 +89,18 @@ impl Capabilities { /// Add a new MCP system based on the provided client type // TODO IMPORTANT need to ensure this times out if the system command is broken! pub async fn add_system(&mut self, config: SystemConfig) -> SystemResult<()> { - let mut client: McpClient = match config { + let mut client: Box = match config { SystemConfig::Sse { ref uri } => { let transport = SseTransport::new(uri); - McpClient::new(transport.start().await?) + let handle = transport.start().await?; + let service = McpService::with_timeout(handle, Duration::from_secs(10)); + Box::new(McpClient::new(service)) } SystemConfig::Stdio { ref cmd, ref args } => { let transport = StdioTransport::new(cmd, args.to_vec()); - McpClient::new(transport.start().await?) + let handle = transport.start().await?; + let service = McpService::with_timeout(handle, Duration::from_secs(10)); + Box::new(McpClient::new(service)) } }; @@ -258,7 +264,10 @@ impl Capabilities { } /// Find and return a reference to the appropriate client for a tool call - fn get_client_for_tool(&self, prefixed_name: &str) -> Option>> { + fn get_client_for_tool( + &self, + prefixed_name: &str, + ) -> Option>>> { prefixed_name .split_once("__") .and_then(|(client_name, _)| self.clients.get(client_name)) From 27d224e3bc8e8d80382e42ae78e6173413b6f6cb Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Fri, 10 Jan 2025 17:22:09 -0500 Subject: [PATCH 10/12] fmt --- crates/mcp-server/src/main.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/mcp-server/src/main.rs b/crates/mcp-server/src/main.rs index b89c44af7..eee250025 100644 --- a/crates/mcp-server/src/main.rs +++ b/crates/mcp-server/src/main.rs @@ -58,7 +58,10 @@ impl Router for CounterRouter { } fn capabilities(&self) -> ServerCapabilities { - CapabilitiesBuilder::new().with_tools(false).with_resources(false, false).build() + CapabilitiesBuilder::new() + .with_tools(false) + .with_resources(false, false) + .build() } fn list_tools(&self) -> Vec { From 6ba8c8b631dbc6fbff3239a8d8b1fa19ee8f2d04 Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Sun, 12 Jan 2025 19:03:34 -0500 Subject: [PATCH 11/12] include timed out msg in system error - client --- crates/goose/src/agents/system.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/goose/src/agents/system.rs b/crates/goose/src/agents/system.rs index 022064a6a..d7e98742e 100644 --- a/crates/goose/src/agents/system.rs +++ b/crates/goose/src/agents/system.rs @@ -7,7 +7,7 @@ use thiserror::Error; pub enum SystemError { #[error("Failed to start the MCP server from configuration `{0}` within 60 seconds")] Initialization(SystemConfig), - #[error("Failed a client call to an MCP server")] + #[error("Failed a client call to an MCP server: {0}")] Client(#[from] ClientError), #[error("Transport error: {0}")] Transport(#[from] mcp_client::transport::Error), From 1ede8c21cbfaba76a86aae96d6c037fb3531e6a5 Mon Sep 17 00:00:00 2001 From: Salman Mohammed Date: Mon, 13 Jan 2025 14:48:41 -0500 Subject: [PATCH 12/12] fmt --- crates/goose/src/agents/capabilities.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/goose/src/agents/capabilities.rs b/crates/goose/src/agents/capabilities.rs index 401d9dc94..94a47cd11 100644 --- a/crates/goose/src/agents/capabilities.rs +++ b/crates/goose/src/agents/capabilities.rs @@ -96,7 +96,11 @@ impl Capabilities { let service = McpService::with_timeout(handle, Duration::from_secs(10)); Box::new(McpClient::new(service)) } - SystemConfig::Stdio { ref cmd, ref args, ref envs } => { + SystemConfig::Stdio { + ref cmd, + ref args, + ref envs, + } => { let transport = StdioTransport::new(cmd, args.to_vec(), envs.get_env()); let handle = transport.start().await?; let service = McpService::with_timeout(handle, Duration::from_secs(10));