Skip to content

Commit

Permalink
add list of subgraphs/sources to context
Browse files Browse the repository at this point in the history
this allows quickly understanding which connector sources are used in a query plan. you can then use a coprocessor to fetch auth tokens, add them to the context, and use them with `headers: [{ name: "Authorization", value: "Bearer {$context.token}" }]`
  • Loading branch information
lennyburdette committed Oct 31, 2024
1 parent 11eacb4 commit cd2a1b7
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 11 deletions.
50 changes: 49 additions & 1 deletion apollo-router/src/plugins/connectors/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::json;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt as TowerServiceExt;

use super::query_plans::get_connectors;
use crate::layers::ServiceExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugins::connectors::configuration::ConnectorsConfig;
use crate::plugins::connectors::request_limit::RequestLimits;
use crate::register_plugin;
use crate::services::execution;
use crate::services::router::body::RouterBody;
use crate::services::supergraph;

const CONNECTORS_DEBUG_HEADER_NAME: &str = "Apollo-Connectors-Debugging";
const CONNECTORS_DEBUG_ENV: &str = "APOLLO_CONNECTORS_DEBUGGING";
const CONNECTORS_DEBUG_KEY: &str = "apolloConnectorsDebugging";
const CONNECTORS_MAX_REQUESTS_ENV: &str = "APOLLO_CONNECTORS_MAX_REQUESTS_PER_OPERATION";
const CONNECTOR_SOURCES_IN_QUERY_PLAN: &str = "apollo_connectors::sources_in_query_plan";

static LAST_DEBUG_ENABLED_VALUE: AtomicBool = AtomicBool::new(false);

Expand Down Expand Up @@ -121,7 +126,7 @@ impl Plugin for Connectors {
if let Some(first) = &mut first {
if let Some(inner) = Arc::into_inner(debug) {
first.extensions.insert(
"apolloConnectorsDebugging",
CONNECTORS_DEBUG_KEY,
json!({"version": "1", "data": inner.into_inner().serialize() }),
);
}
Expand All @@ -143,6 +148,49 @@ impl Plugin for Connectors {
)
.boxed()
}

fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
ServiceBuilder::new()
.map_request(|req: execution::Request| {
let Some(connectors) = get_connectors(&req.context) else {
return req;
};

// add [{"subgraph_name": "", "source_name": ""}] to the context
// for connectors with sources in the query plan.
let list = req
.query_plan
.root
.service_usage()
.unique()
.flat_map(|service_name| {
let Some(connector) = connectors.get(service_name) else {
return None;
};

let Some(ref source_name) = connector.id.source_name else {
return None;
};

Some((connector.id.subgraph_name.clone(), source_name.clone()))
})
.unique()
.map(|(subgraph_name, source_name)| {
json!({
"subgraph_name": subgraph_name,
"source_name": source_name,
})
})
.collect_vec();

req.context
.insert(CONNECTOR_SOURCES_IN_QUERY_PLAN, list)
.unwrap();
req
})
.service(service)
.boxed()
}
}

pub(crate) const PLUGIN_NAME: &str = "preview_connectors";
Expand Down
28 changes: 23 additions & 5 deletions apollo-router/src/plugins/connectors/query_plans.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
use std::sync::Arc;

use apollo_federation::sources::connect::Connector;
use indexmap::IndexMap;

use crate::query_planner::PlanNode;
use crate::Context;

type ConnectorsContext = Arc<IndexMap<Arc<str>, String>>;
type ConnectorsByServiceName = Arc<IndexMap<Arc<str>, Connector>>;

pub(crate) fn store_connectors_context(
pub(crate) fn store_connectors(
context: &Context,
connectors_by_service_name: Arc<IndexMap<Arc<str>, Connector>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsByServiceName>(connectors_by_service_name));
}

pub(crate) fn get_connectors(context: &Context) -> Option<ConnectorsByServiceName> {
context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsByServiceName>().cloned())
}

type ConnectorLabels = Arc<IndexMap<Arc<str>, String>>;

pub(crate) fn store_connectors_labels(
context: &Context,
labels_by_service_name: Arc<IndexMap<Arc<str>, String>>,
) {
context
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorsContext>(labels_by_service_name));
.with_lock(|mut lock| lock.insert::<ConnectorLabels>(labels_by_service_name));
}

pub(crate) fn replace_connector_service_names_text(
Expand All @@ -22,7 +40,7 @@ pub(crate) fn replace_connector_service_names_text(
) -> Option<Arc<String>> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());
if let Some(replacements) = replacements {
text.as_ref().map(|text| {
let mut text = text.to_string();
Expand All @@ -42,7 +60,7 @@ pub(crate) fn replace_connector_service_names(
) -> Arc<PlanNode> {
let replacements = context
.extensions()
.with_lock(|lock| lock.get::<ConnectorsContext>().cloned());
.with_lock(|lock| lock.get::<ConnectorLabels>().cloned());

return if let Some(replacements) = replacements {
let mut plan = plan.clone();
Expand Down
72 changes: 72 additions & 0 deletions apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,78 @@ async fn test_interface_object() {
);
}

#[tokio::test]
async fn test_sources_in_context() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/coprocessor"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"control": "continue",
"version": 1,
"stage": "ExecutionRequest"
})))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/posts"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!([
{ "userId": 1, "id": 1, "title": "title", "body": "body" },
{ "userId": 1, "id": 2, "title": "title", "body": "body" }]
)))
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/users/1"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"id": 1,
"name": "Leanne Graham",
"username": "Bret"
})))
.mount(&mock_server)
.await;
let uri = mock_server.uri();

let _ = execute(
&QUICKSTART_SCHEMA.replace("https://jsonplaceholder.typicode.com", &mock_server.uri()),
&uri,
"query Posts { posts { id body title author { name username } } }",
Default::default(),
Some(json!({
"coprocessor": {
"url": format!("{}/coprocessor", mock_server.uri()),
"execution": {
"request": {
"context": true
}
}
}
})),
|_| {},
)
.await;

let requests = &mock_server.received_requests().await.unwrap();
let coprocessor_request = requests.first().unwrap();
let body = coprocessor_request
.body_json::<serde_json_bytes::Value>()
.unwrap();
pretty_assertions::assert_eq!(
body.get("context")
.unwrap()
.as_object()
.unwrap()
.get("entries")
.unwrap()
.as_object()
.unwrap()
.get("apollo_connectors::sources_in_query_plan")
.unwrap(),
&serde_json_bytes::json!([
{ "subgraph_name": "connectors", "source_name": "jsonPlaceholder" }
])
);
}

mod quickstart_tests {
use super::*;

Expand Down
4 changes: 4 additions & 0 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl QueryPlan {
pub fn subgraph_fetches(&self) -> usize {
self.root.subgraph_fetches()
}

pub fn service_usage<'a>(&'a self) -> Box<dyn Iterator<Item = &'a str> + 'a> {
self.root.service_usage()
}
}

// holds the query plan executon arguments that do not change between calls
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ impl FetchNode {
}
}

#[cfg(test)]
pub(crate) fn service_name(&self) -> &str {
&self.service_name
}
Expand Down
1 change: 0 additions & 1 deletion apollo-router/src/query_planner/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,6 @@ impl PlanNode {
Ok(())
}

#[cfg(test)]
/// Retrieves all the services used across all plan nodes.
///
/// Note that duplicates are not filtered.
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/router_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,9 @@ pub(crate) async fn create_plugins(
add_optional_apollo_plugin!("demand_control");

// This relative ordering is documented in `docs/source/customizations/native.mdx`:
add_optional_apollo_plugin!("preview_connectors");
add_optional_apollo_plugin!("rhai");
add_optional_apollo_plugin!("coprocessor");
add_optional_apollo_plugin!("preview_connectors");
add_user_plugins!();

// Macros above remove from `apollo_plugin_factories`, so anything left at the end
Expand Down
6 changes: 4 additions & 2 deletions apollo-router/src/services/supergraph/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::graphql;
use crate::graphql::IntoGraphQLErrors;
use crate::graphql::Response;
use crate::plugin::DynPlugin;
use crate::plugins::connectors::query_plans::store_connectors_context;
use crate::plugins::connectors::query_plans::store_connectors;
use crate::plugins::connectors::query_plans::store_connectors_labels;
use crate::plugins::subscription::Subscription;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
Expand Down Expand Up @@ -130,7 +131,8 @@ impl Service<SupergraphRequest> for SupergraphService {

fn call(&mut self, req: SupergraphRequest) -> Self::Future {
if let Some(connectors) = &self.schema.connectors {
store_connectors_context(&req.context, connectors.labels_by_service_name.clone());
store_connectors_labels(&req.context, connectors.labels_by_service_name.clone());
store_connectors(&req.context, connectors.by_service_name.clone());
}

// Consume our cloned services and allow ownership to be transferred to the async block.
Expand Down

0 comments on commit cd2a1b7

Please sign in to comment.