Skip to content

Commit

Permalink
feat(cache): add support for surrogate cache key (#6234)
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin <[email protected]>
Co-authored-by: Jesse Rosenberger <[email protected]>
  • Loading branch information
bnjjj and abernix authored Nov 20, 2024
1 parent ec784f1 commit 83e6291
Show file tree
Hide file tree
Showing 26 changed files with 780 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,11 @@ snapshot_kind: text
"description": "Enable or disable the entity caching feature",
"type": "boolean"
},
"expose_keys_in_context": {
"default": false,
"description": "Expose cache keys in context",
"type": "boolean"
},
"invalidation": {
"$ref": "#/definitions/InvalidationEndpointConfig",
"description": "#/definitions/InvalidationEndpointConfig",
Expand Down
26 changes: 18 additions & 8 deletions apollo-router/src/plugins/cache/cache_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,24 @@ impl CacheControl {
Ok(result)
}

/// Fill the header map with cache-control header and age header
pub(crate) fn to_headers(&self, headers: &mut HeaderMap) -> Result<(), BoxError> {
headers.insert(
CACHE_CONTROL,
HeaderValue::from_str(&self.to_cache_control_header()?)?,
);

if let Some(age) = self.age {
if age != 0 {
headers.insert(AGE, age.into());
}
}

Ok(())
}

/// Only for cache control header and not age
pub(crate) fn to_cache_control_header(&self) -> Result<String, BoxError> {
let mut s = String::new();
let mut prev = false;
let now = now_epoch_seconds();
Expand Down Expand Up @@ -228,15 +245,8 @@ impl CacheControl {
if self.stale_if_error {
write!(&mut s, "{}stale-if-error", if prev { "," } else { "" },)?;
}
headers.insert(CACHE_CONTROL, HeaderValue::from_str(&s)?);

if let Some(age) = self.age {
if age != 0 {
headers.insert(AGE, age.into());
}
}

Ok(())
Ok(s)
}

pub(super) fn no_store() -> Self {
Expand Down
193 changes: 193 additions & 0 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::plugins::authorization::CacheKeyMetadata;
use crate::query_planner::fetch::QueryHash;
use crate::query_planner::OperationKind;
use crate::services::subgraph;
use crate::services::subgraph::SubgraphRequestId;
use crate::services::supergraph;
use crate::spec::TYPENAME;
use crate::Context;
Expand All @@ -62,6 +63,8 @@ pub(crate) const ENTITY_CACHE_VERSION: &str = "1.0";
pub(crate) const ENTITIES: &str = "_entities";
pub(crate) const REPRESENTATIONS: &str = "representations";
pub(crate) const CONTEXT_CACHE_KEY: &str = "apollo_entity_cache::key";
/// Context key to enable support of surrogate cache key
pub(crate) const CONTEXT_CACHE_KEYS: &str = "apollo::entity_cache::cached_keys_status";

register_plugin!("apollo", "preview_entity_cache", EntityCache);

Expand All @@ -73,6 +76,7 @@ pub(crate) struct EntityCache {
entity_type: Option<String>,
enabled: bool,
metrics: Metrics,
expose_keys_in_context: bool,
private_queries: Arc<RwLock<HashSet<String>>>,
pub(crate) invalidation: Invalidation,
}
Expand All @@ -96,6 +100,10 @@ pub(crate) struct Config {
#[serde(default)]
enabled: bool,

#[serde(default)]
/// Expose cache keys in context
expose_keys_in_context: bool,

/// Configure invalidation per subgraph
subgraph: SubgraphConfiguration<Subgraph>,

Expand Down Expand Up @@ -297,6 +305,7 @@ impl Plugin for EntityCache {
storage,
entity_type,
enabled: init.config.enabled,
expose_keys_in_context: init.config.expose_keys_in_context,
endpoint_config: init.config.invalidation.clone().map(Arc::new),
subgraphs: Arc::new(init.config.subgraph),
metrics: init.config.metrics,
Expand Down Expand Up @@ -390,6 +399,7 @@ impl Plugin for EntityCache {
private_queries,
private_id,
invalidation: self.invalidation.clone(),
expose_keys_in_context: self.expose_keys_in_context,
})));
tower::util::BoxService::new(inner)
} else {
Expand Down Expand Up @@ -467,6 +477,7 @@ impl EntityCache {
storage,
entity_type: None,
enabled: true,
expose_keys_in_context: true,
subgraphs: Arc::new(SubgraphConfiguration {
all: Subgraph::default(),
subgraphs,
Expand Down Expand Up @@ -496,6 +507,7 @@ struct InnerCacheService {
subgraph_ttl: Option<Duration>,
private_queries: Arc<RwLock<HashSet<String>>>,
private_id: Option<String>,
expose_keys_in_context: bool,
invalidation: Invalidation,
}

Expand Down Expand Up @@ -567,6 +579,7 @@ impl InnerCacheService {
self.storage.clone(),
is_known_private,
private_id.as_deref(),
self.expose_keys_in_context,
request,
)
.instrument(tracing::info_span!("cache.entity.lookup"))
Expand Down Expand Up @@ -614,6 +627,7 @@ impl InnerCacheService {

if private_id.is_none() {
// the response has a private scope but we don't have a way to differentiate users, so we do not store the response in cache
// We don't need to fill the context with this cache key as it will never be cached
return Ok(response);
}
}
Expand All @@ -638,6 +652,7 @@ impl InnerCacheService {
&response,
cache_control,
root_cache_key,
self.expose_keys_in_context,
)
.await?;
}
Expand All @@ -663,12 +678,14 @@ impl InnerCacheService {
Ok(response)
}
} else {
let request_id = request.id.clone();
match cache_lookup_entities(
self.name.clone(),
self.storage.clone(),
is_known_private,
private_id.as_deref(),
request,
self.expose_keys_in_context,
)
.instrument(tracing::info_span!("cache.entity.lookup"))
.await?
Expand Down Expand Up @@ -701,6 +718,20 @@ impl InnerCacheService {
&[graphql_error],
&mut cache_result.0,
);
if self.expose_keys_in_context {
// Update cache keys needed for surrogate cache key because new data has not been fetched
context.upsert::<_, CacheKeysContext>(
CONTEXT_CACHE_KEYS,
|mut value| {
if let Some(cache_keys) = value.get_mut(&request_id) {
cache_keys.retain(|cache_key| {
matches!(cache_key.status, CacheKeyStatus::Cached)
});
}
value
},
)?;
}

let mut data = Object::default();
data.insert(ENTITIES, new_entities.into());
Expand All @@ -727,6 +758,25 @@ impl InnerCacheService {
if let Some(control_from_cached) = cache_result.1 {
cache_control = cache_control.merge(&control_from_cached);
}
if self.expose_keys_in_context {
// Update cache keys needed for surrogate cache key when it's new data and not data from the cache
let response_id = response.id.clone();
let cache_control_str = cache_control.to_cache_control_header()?;
response.context.upsert::<_, CacheKeysContext>(
CONTEXT_CACHE_KEYS,
|mut value| {
if let Some(cache_keys) = value.get_mut(&response_id) {
for cache_key in cache_keys
.iter_mut()
.filter(|c| matches!(c.status, CacheKeyStatus::New))
{
cache_key.cache_control = cache_control_str.clone();
}
}
value
},
)?;
}

if !is_known_private && cache_control.private() {
self.private_queries.write().await.insert(query.to_string());
Expand Down Expand Up @@ -797,6 +847,7 @@ async fn cache_lookup_root(
cache: RedisCacheStorage,
is_known_private: bool,
private_id: Option<&str>,
expose_keys_in_context: bool,
mut request: subgraph::Request,
) -> Result<ControlFlow<subgraph::Response, (subgraph::Request, String)>, BoxError> {
let body = request.subgraph_request.body_mut();
Expand All @@ -822,6 +873,36 @@ async fn cache_lookup_root(
.context
.extensions()
.with_lock(|mut lock| lock.insert(control));
if expose_keys_in_context {
let request_id = request.id.clone();
let cache_control_header = value.0.control.to_cache_control_header()?;
request.context.upsert::<_, CacheKeysContext>(
CONTEXT_CACHE_KEYS,
|mut val| {
match val.get_mut(&request_id) {
Some(v) => {
v.push(CacheKeyContext {
key: key.clone(),
status: CacheKeyStatus::Cached,
cache_control: cache_control_header,
});
}
None => {
val.insert(
request_id,
vec![CacheKeyContext {
key: key.clone(),
status: CacheKeyStatus::Cached,
cache_control: cache_control_header,
}],
);
}
}

val
},
)?;
}

let mut response = subgraph::Response::builder()
.data(value.0.data)
Expand Down Expand Up @@ -851,6 +932,7 @@ async fn cache_lookup_entities(
is_known_private: bool,
private_id: Option<&str>,
mut request: subgraph::Request,
expose_keys_in_context: bool,
) -> Result<ControlFlow<subgraph::Response, (subgraph::Request, EntityCacheResults)>, BoxError> {
let body = request.subgraph_request.body_mut();

Expand Down Expand Up @@ -893,6 +975,46 @@ async fn cache_lookup_entities(
let (new_representations, cache_result, cache_control) =
filter_representations(&name, representations, keys, cache_result, &request.context)?;

if expose_keys_in_context {
let mut cache_entries = Vec::with_capacity(cache_result.len());
for intermediate_result in &cache_result {
match &intermediate_result.cache_entry {
Some(cache_entry) => {
cache_entries.push(CacheKeyContext {
key: intermediate_result.key.clone(),
status: CacheKeyStatus::Cached,
cache_control: cache_entry.control.to_cache_control_header()?,
});
}
None => {
cache_entries.push(CacheKeyContext {
key: intermediate_result.key.clone(),
status: CacheKeyStatus::New,
cache_control: match &cache_control {
Some(cc) => cc.to_cache_control_header()?,
None => CacheControl::default().to_cache_control_header()?,
},
});
}
}
}
let request_id = request.id.clone();
request
.context
.upsert::<_, CacheKeysContext>(CONTEXT_CACHE_KEYS, |mut v| {
match v.get_mut(&request_id) {
Some(cache_keys) => {
cache_keys.append(&mut cache_entries);
}
None => {
v.insert(request_id, cache_entries);
}
}

v
})?;
}

if !new_representations.is_empty() {
body.variables
.insert(REPRESENTATIONS, new_representations.into());
Expand Down Expand Up @@ -954,6 +1076,7 @@ async fn cache_store_root_from_response(
response: &subgraph::Response,
cache_control: CacheControl,
cache_key: String,
expose_keys_in_context: bool,
) -> Result<(), BoxError> {
if let Some(data) = response.response.body().data.as_ref() {
let ttl: Option<Duration> = cache_control
Expand All @@ -964,6 +1087,37 @@ async fn cache_store_root_from_response(
if response.response.body().errors.is_empty() && cache_control.should_store() {
let span = tracing::info_span!("cache.entity.store");
let data = data.clone();
if expose_keys_in_context {
let response_id = response.id.clone();
let cache_control_header = cache_control.to_cache_control_header()?;

response
.context
.upsert::<_, CacheKeysContext>(CONTEXT_CACHE_KEYS, |mut val| {
match val.get_mut(&response_id) {
Some(v) => {
v.push(CacheKeyContext {
key: cache_key.clone(),
status: CacheKeyStatus::New,
cache_control: cache_control_header,
});
}
None => {
val.insert(
response_id,
vec![CacheKeyContext {
key: cache_key.clone(),
status: CacheKeyStatus::New,
cache_control: cache_control_header,
}],
);
}
}

val
})?;
}

tokio::spawn(async move {
cache
.insert(
Expand Down Expand Up @@ -1419,3 +1573,42 @@ fn assemble_response_from_errors(
}
(new_entities, new_errors)
}

pub(crate) type CacheKeysContext = HashMap<SubgraphRequestId, Vec<CacheKeyContext>>;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(test, derive(PartialEq, Eq, Hash, PartialOrd, Ord))]
pub(crate) struct CacheKeyContext {
key: String,
status: CacheKeyStatus,
cache_control: String,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(test, derive(PartialEq, Eq, Hash))]
#[serde(rename_all = "snake_case")]
pub(crate) enum CacheKeyStatus {
/// New cache key inserted in the cache
New,
/// Key that was already in the cache
Cached,
}

#[cfg(test)]
impl PartialOrd for CacheKeyStatus {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

#[cfg(test)]
impl Ord for CacheKeyStatus {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match (self, other) {
(CacheKeyStatus::New, CacheKeyStatus::New) => std::cmp::Ordering::Equal,
(CacheKeyStatus::New, CacheKeyStatus::Cached) => std::cmp::Ordering::Greater,
(CacheKeyStatus::Cached, CacheKeyStatus::New) => std::cmp::Ordering::Less,
(CacheKeyStatus::Cached, CacheKeyStatus::Cached) => std::cmp::Ordering::Equal,
}
}
}
Loading

0 comments on commit 83e6291

Please sign in to comment.