Skip to content

Commit

Permalink
Use Rayon instead of spawn_blocking for WSGI
Browse files Browse the repository at this point in the history
  • Loading branch information
levkk committed Oct 26, 2024
1 parent 4272425 commit ea4cbd6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 17 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rwf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ description = "Comprehensive framework for building web applications in Rust"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
wsgi = ["pyo3"]
wsgi = ["pyo3", "rayon"]
default = []

[dependencies]
Expand Down Expand Up @@ -37,6 +37,7 @@ regex = "1"
sha1 = "0.10"
toml = "0.8"
pyo3 = { version = "0.22", features = ["auto-initialize"], optional = true }
rayon = { version = "1", optional = true }

[dev-dependencies]
tempdir = "0.3"
42 changes: 26 additions & 16 deletions rwf/src/controller/wsgi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,61 @@ use crate::http::{wsgi::WsgiRequest, Request, Response};

use async_trait::async_trait;
use pyo3::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use tokio::sync::oneshot::channel;
use tokio::time::{timeout, Duration};

pub struct WsgiController {
path: &'static str,
timeout: Duration,
pool: ThreadPool,
}

impl WsgiController {
pub fn new(path: &'static str) -> Self {
WsgiController {
path,
timeout: Duration::from_secs(60),
pool: ThreadPoolBuilder::new().num_threads(2).build().unwrap(),
}
}

pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

pub fn max_threads(mut self, threads: usize) -> Self {
self.pool = ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.unwrap();
self
}
}

#[async_trait]
impl Controller for WsgiController {
async fn handle(&self, request: &Request) -> Result<Response, Error> {
let request = WsgiRequest::from_request(request)?;
let path = self.path;
let (tx, rx) = channel();

self.pool.spawn(move || {
let response = Python::with_gil(|py| {
// Import is cached.
let module = PyModule::import_bound(py, path).unwrap();
let application: Py<PyAny> = module.getattr("application").unwrap().into();
request.send(&application)
})
.unwrap();

tx.send(response).unwrap();
});

// TODO: spawn blocking tasks cannot be aborted.
// This only aborts waiting for the result to be returned.
match timeout(
self.timeout,
tokio::task::spawn_blocking(move || {
let response = Python::with_gil(|py| {
// Import is cached.
let module = PyModule::import_bound(py, path).unwrap();
let application: Py<PyAny> = module.getattr("application").unwrap().into();
request.send(&application)
})
.unwrap();

response
}),
)
.await?
{
match timeout(self.timeout, rx).await? {
Ok(response) => Ok(response.to_response()?),
Err(e) => Ok(Response::internal_error(e)),
}
Expand Down

0 comments on commit ea4cbd6

Please sign in to comment.