Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move computation-heavy tasks to a dedicated thread pool #6122

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changesets/fix_simon_compute_task.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### Move computation-heavy tasks to a dedicated thread pool ([PR #6122](https://github.com/apollographql/router/pull/6122))

These components can take non-trivial amounts of CPU time:
* GraphQL parsing
* GraphQL validation
* Query planning
* Schema introspection

In order to avoid blocking threads that execute asynchronous code, they are now run (in their respective Rust implementations) in a new pool of as many threads as CPU cores are available. Previously we used Tokio’s [`spawn_blocking`] for this purpose, but it is appears to be intended for blocking I/O and uses up to 512 threads so it isn’t a great fit for computation tasks.

[`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html

By [@SimonSapin](https://github.com/SimonSapin) in https://github.com/apollographql/router/pull/6122
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ dependencies = [
"tempfile",
"test-log",
"thiserror",
"threadpool",
"tikv-jemallocator",
"time",
"tokio",
Expand Down
1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ static_assertions = "1.1.0"
strum_macros = "0.26.0"
sys-info = "0.9.1"
thiserror = "1.0.61"
threadpool = "1.8.1"
tokio.workspace = true
tokio-stream = { version = "0.1.15", features = ["sync", "net"] }
tokio-util = { version = "0.7.11", features = ["net", "codec", "time"] }
Expand Down
48 changes: 48 additions & 0 deletions apollo-router/src/compute_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::sync::OnceLock;

use tokio::sync::oneshot;

static POOL: OnceLock<threadpool::ThreadPool> = OnceLock::new();

/// Drop-in replacement for Tokio’s [`spawn_blocking`]
/// but intended for for tasks that keep a CPU core active: it uses a thread pool
SimonSapin marked this conversation as resolved.
Show resolved Hide resolved
/// limited to the number of available CPU cores.
///
/// `spawn_blocking` on the other hand appears to be intended for tasks that pause a thread
/// in a blocking syscall: `max_blocking_threads` is configurable but defaults to 512,
/// which is too high for tasks where the CPU is actively running.
/// Configuring it to $NUM_CPUS feels risky in case a library we use relies on `spawn_blocking`
/// in the originally intended way.
///
/// [`spawn_blocking`]: https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html
///
/// ## Example
///
/// Doing this in an async function can work but isn't great for Tokio performance:
///
/// ```no_run
/// let output = some_heavy_computation();
/// ```
///
/// It can be replaced with:
///
/// ```no_run
/// use crate::compute_task;
///
/// let output = compute_task::execute(|| some_heavy_computation()).await;
/// ```
pub(crate) fn execute<T, F>(f: F) -> oneshot::Receiver<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
// https://docs.rs/threadpool/latest/threadpool/struct.Builder.html#method.num_threads
// > defaults the number of threads to the number of CPUs
let pool = POOL.get_or_init(|| threadpool::Builder::new().build());
garypen marked this conversation as resolved.
Show resolved Hide resolved

let (tx, rx) = oneshot::channel();
pool.execute(move || {
let _ = tx.send(f());
});
rx
}
1 change: 1 addition & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mod apollo_studio_interop;
pub(crate) mod axum_factory;
mod batching;
mod cache;
mod compute_task;
mod configuration;
mod context;
mod error;
Expand Down
13 changes: 6 additions & 7 deletions apollo-router/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::PlanNode;
use super::QueryKey;
use crate::apollo_studio_interop::generate_usage_reporting;
use crate::cache::storage::CacheStorage;
use crate::compute_task;
use crate::configuration::IntrospectionMode as IntrospectionConfig;
use crate::configuration::QueryPlannerMode;
use crate::error::PlanErrors;
Expand Down Expand Up @@ -292,7 +293,7 @@ impl PlannerMode {
PlannerMode::Rust(rust_planner) => {
let doc = doc.clone();
let rust_planner = rust_planner.clone();
let (plan, mut root_node) = tokio::task::spawn_blocking(move || {
let (plan, mut root_node) = compute_task::execute(move || {
let start = Instant::now();

let query_plan_options = QueryPlanOptions {
Expand Down Expand Up @@ -543,11 +544,9 @@ impl BridgeQueryPlanner {
IntrospectionMode::Rust => {
let schema = self.schema.clone();
let response = Box::new(
tokio::task::spawn_blocking(move || {
Self::rust_introspection(&schema, &key, &doc)
})
.await
.expect("Introspection panicked")?,
compute_task::execute(move || Self::rust_introspection(&schema, &key, &doc))
.await
.expect("Introspection panicked")?,
);
return Ok(QueryPlannerContent::Response { response });
}
Expand Down Expand Up @@ -581,7 +580,7 @@ impl BridgeQueryPlanner {
.map_err(QueryPlannerError::Introspection);
let schema = self.schema.clone();
let js_result_clone = js_result.clone();
tokio::task::spawn_blocking(move || {
compute_task::execute(move || {
let rust_result = match Self::rust_introspection(&schema, &key, &doc) {
Ok(response) => {
if response.errors.is_empty() {
Expand Down
6 changes: 3 additions & 3 deletions apollo-router/src/services/layers/query_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use http::StatusCode;
use lru::LruCache;
use router_bridge::planner::UsageReporting;
use tokio::sync::Mutex;
use tokio::task;

use crate::apollo_studio_interop::generate_extended_references;
use crate::apollo_studio_interop::ExtendedReferenceStats;
use crate::compute_task;
use crate::context::OPERATION_KIND;
use crate::context::OPERATION_NAME;
use crate::graphql::Error;
Expand Down Expand Up @@ -85,11 +85,11 @@ impl QueryAnalysisLayer {
let schema = self.schema.clone();
let conf = self.configuration.clone();

// Must be created *outside* of the spawn_blocking or the span is not connected to the
// Must be created *outside* of the compute_task or the span is not connected to the
// parent
let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL");

task::spawn_blocking(move || {
compute_task::execute(move || {
span.in_scope(|| {
let doc = Query::parse_document(
&query,
Expand Down
Loading