Skip to content

Commit

Permalink
Create the invalidation endpoint for entity caching (#5614)
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <[email protected]>
Co-authored-by: Geoffroy Couprie <[email protected]>
  • Loading branch information
bnjjj and Geal authored Jul 30, 2024
1 parent e16c9bf commit d940c3f
Show file tree
Hide file tree
Showing 19 changed files with 944 additions and 96 deletions.
8 changes: 6 additions & 2 deletions apollo-router/src/configuration/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,12 @@ pub(crate) fn validate_yaml_configuration(
let offset = start_marker
.line()
.saturating_sub(NUMBER_OF_PREVIOUS_LINES_TO_DISPLAY);

let lines = yaml_split_by_lines[offset..end_marker.line()]
let end = if end_marker.line() > yaml_split_by_lines.len() {
yaml_split_by_lines.len()
} else {
end_marker.line()
};
let lines = yaml_split_by_lines[offset..end]
.iter()
.map(|line| format!(" {line}"))
.join("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,11 @@ expression: "&schema"
"description": "Enable or disable the entity caching feature",
"type": "boolean"
},
"invalidation": {
"$ref": "#/definitions/InvalidationEndpointConfig",
"description": "#/definitions/InvalidationEndpointConfig",
"nullable": true
},
"metrics": {
"$ref": "#/definitions/Metrics",
"description": "#/definitions/Metrics"
Expand Down Expand Up @@ -3518,6 +3523,24 @@ expression: "&schema"
},
"type": "object"
},
"InvalidationEndpointConfig": {
"additionalProperties": false,
"properties": {
"listen": {
"$ref": "#/definitions/ListenAddr",
"description": "#/definitions/ListenAddr"
},
"path": {
"description": "Specify on which path you want to listen for invalidation endpoint.",
"type": "string"
}
},
"required": [
"listen",
"path"
],
"type": "object"
},
"JWTConf": {
"additionalProperties": false,
"properties": {
Expand Down Expand Up @@ -5571,11 +5594,17 @@ expression: "&schema"
"description": "Per subgraph configuration for entity caching",
"properties": {
"enabled": {
"default": true,
"description": "activates caching for this subgraph, overrides the global configuration",
"nullable": true,
"type": "boolean"
},
"invalidation": {
"$ref": "#/definitions/SubgraphInvalidationConfig",
"description": "#/definitions/SubgraphInvalidationConfig",
"nullable": true
},
"private_id": {
"default": null,
"description": "Context key used to separate cache sections per user",
"nullable": true,
"type": "string"
Expand Down Expand Up @@ -5779,6 +5808,22 @@ expression: "&schema"
},
"type": "object"
},
"SubgraphInvalidationConfig": {
"additionalProperties": false,
"properties": {
"enabled": {
"default": false,
"description": "Enable the invalidation",
"type": "boolean"
},
"shared_key": {
"default": "",
"description": "Shared key needed to request the invalidation endpoint",
"type": "string"
}
},
"type": "object"
},
"SubgraphPassthroughMode": {
"additionalProperties": false,
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ preview_entity_cache:
timeout: 5ms
ttl: 60s
enabled: true
invalidation:
listen: "127.0.0.1:4000"
path: /invalidation
subgraph:
subgraphs:
accounts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ preview_entity_cache:
urls: [ "redis://localhost:6379" ]
timeout: 5ms
ttl: 60s
invalidation:
listen: 127.0.0.1:4000
path: /invalidation
subgraph:
all:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ preview_entity_cache:
timeout: 5ms
ttl: 60s
enabled: true
invalidation:
listen: 127.0.0.1:4000
path: /invalidation
subgraphs:
accounts:
enabled: false
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ where
}
#[allow(clippy::collapsible_if)]
if topic_to_delete {
tracing::trace!("deleting subscription from unsubscribe");
if self.subscriptions.remove(&topic).is_some() {
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
Expand Down Expand Up @@ -880,6 +881,7 @@ where

// Send error message to all killed connections
for (_subscriber_id, subscription) in closed_subs {
tracing::trace!("deleting subscription from kill_dead_topics");
i64_up_down_counter!(
"apollo_router_opened_subscriptions",
"Number of opened subscriptions",
Expand Down Expand Up @@ -907,7 +909,7 @@ where
}

fn force_delete(&mut self, topic: K) {
tracing::trace!("deleting subscription");
tracing::trace!("deleting subscription from force_delete");
let sub = self.subscriptions.remove(&topic);
if let Some(sub) = sub {
i64_up_down_counter!(
Expand Down
102 changes: 92 additions & 10 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;

use http::header;
use http::header::CACHE_CONTROL;
use multimap::MultiMap;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
Expand All @@ -26,6 +27,9 @@ use tracing::Level;
use super::cache_control::CacheControl;
use super::invalidation::Invalidation;
use super::invalidation::InvalidationOrigin;
use super::invalidation_endpoint::InvalidationEndpointConfig;
use super::invalidation_endpoint::InvalidationService;
use super::invalidation_endpoint::SubgraphInvalidationConfig;
use super::metrics::CacheMetricContextKey;
use super::metrics::CacheMetricsService;
use crate::batching::BatchQuery;
Expand All @@ -49,6 +53,8 @@ use crate::services::subgraph;
use crate::services::supergraph;
use crate::spec::TYPENAME;
use crate::Context;
use crate::Endpoint;
use crate::ListenAddr;

/// Change this key if you introduce a breaking change in entity caching algorithm to make sure it won't take the previous entries
pub(crate) const ENTITY_CACHE_VERSION: &str = "1.0";
Expand All @@ -61,6 +67,7 @@ register_plugin!("apollo", "preview_entity_cache", EntityCache);
#[derive(Clone)]
pub(crate) struct EntityCache {
storage: Option<RedisCacheStorage>,
endpoint_config: Option<Arc<InvalidationEndpointConfig>>,
subgraphs: Arc<SubgraphConfiguration<Subgraph>>,
entity_type: Option<String>,
enabled: bool,
Expand All @@ -78,25 +85,43 @@ pub(crate) struct Config {
#[serde(default)]
enabled: bool,

/// Configure invalidation per subgraph
subgraph: SubgraphConfiguration<Subgraph>,

/// Global invalidation configuration
invalidation: Option<InvalidationEndpointConfig>,

/// Entity caching evaluation metrics
#[serde(default)]
metrics: Metrics,
}

/// Per subgraph configuration for entity caching
#[derive(Clone, Debug, Default, JsonSchema, Deserialize, Serialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
#[derive(Clone, Debug, JsonSchema, Deserialize, Serialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields, default)]
pub(crate) struct Subgraph {
/// expiration for all keys for this subgraph, unless overriden by the `Cache-Control` header in subgraph responses
pub(crate) ttl: Option<Ttl>,

/// activates caching for this subgraph, overrides the global configuration
pub(crate) enabled: Option<bool>,
pub(crate) enabled: bool,

/// Context key used to separate cache sections per user
pub(crate) private_id: Option<String>,

/// Invalidation configuration
pub(crate) invalidation: Option<SubgraphInvalidationConfig>,
}

impl Default for Subgraph {
fn default() -> Self {
Self {
enabled: true,
ttl: Default::default(),
private_id: Default::default(),
invalidation: Default::default(),
}
}
}

/// Per subgraph configuration for entity caching
Expand Down Expand Up @@ -179,12 +204,29 @@ impl Plugin for EntityCache {
.into());
}

if init
.config
.subgraph
.all
.invalidation
.as_ref()
.map(|i| i.shared_key.is_empty())
.unwrap_or_default()
{
return Err(
"you must set a default shared_key invalidation for all subgraphs"
.to_string()
.into(),
);
}

let invalidation = Invalidation::new(storage.clone()).await?;

Ok(Self {
storage,
entity_type,
enabled: init.config.enabled,
endpoint_config: init.config.invalidation.clone().map(Arc::new),
subgraphs: Arc::new(init.config.subgraph),
metrics: init.config.metrics,
private_queries: Arc::new(RwLock::new(HashSet::new())),
Expand Down Expand Up @@ -240,13 +282,8 @@ impl Plugin for EntityCache {
.clone()
.map(|t| t.0)
.or_else(|| storage.ttl());
let subgraph_enabled = self.enabled
&& self
.subgraphs
.get(name)
.enabled
// if the top level `enabled` is true but there is no other configuration, caching is enabled for this plugin
.unwrap_or(true);
let subgraph_enabled =
self.enabled && (self.subgraphs.all.enabled || self.subgraphs.get(name).enabled);
let private_id = self.subgraphs.get(name).private_id.clone();

let name = name.to_string();
Expand Down Expand Up @@ -300,6 +337,40 @@ impl Plugin for EntityCache {
.boxed()
}
}

fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
let mut map = MultiMap::new();
if self.enabled
&& self
.subgraphs
.all
.invalidation
.as_ref()
.map(|i| i.enabled)
.unwrap_or_default()
{
match &self.endpoint_config {
Some(endpoint_config) => {
let endpoint = Endpoint::from_router_service(
endpoint_config.path.clone(),
InvalidationService::new(self.subgraphs.clone(), self.invalidation.clone())
.boxed(),
);
tracing::info!(
"Entity caching invalidation endpoint listening on: {}{}",
endpoint_config.listen,
endpoint_config.path
);
map.insert(endpoint_config.listen.clone(), endpoint);
}
None => {
tracing::warn!("Cannot start entity caching invalidation endpoint because the listen address and endpoint is not configured");
}
}
}

map
}
}

impl EntityCache {
Expand All @@ -311,6 +382,10 @@ impl EntityCache {
where
Self: Sized,
{
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;

let invalidation = Invalidation::new(Some(storage.clone())).await?;
Ok(Self {
storage: Some(storage),
Expand All @@ -322,6 +397,13 @@ impl EntityCache {
}),
metrics: Metrics::default(),
private_queries: Default::default(),
endpoint_config: Some(Arc::new(InvalidationEndpointConfig {
path: String::from("/invalidation"),
listen: ListenAddr::SocketAddr(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
4000,
)),
})),
invalidation,
})
}
Expand Down
Loading

0 comments on commit d940c3f

Please sign in to comment.