Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: v2 migration Cloudflare KV #81

Merged
merged 9 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions src/attestation_store/cf_kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use {
super::{AttestationStore, Result},
crate::http_server::{CsrfToken, TokenManager},
async_trait::async_trait,
hyper::StatusCode,
reqwest::Url,
serde::Serialize,
std::time::Duration,
};

#[derive(Clone)]
pub struct CloudflareKv {
pub endpoint: Url,
pub token_manager: TokenManager,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attestation_store -> http_server dependency really stinks, but oh well it's going to be deprecated anyway 😄

pub http_client: reqwest::Client,
}

impl CloudflareKv {
pub fn new(endpoint: Url, token_manager: TokenManager) -> Self {
Self {
endpoint,
token_manager,
http_client: reqwest::Client::new(),
}
}
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct SetAttestationCompatBody<'a> {
attestation_id: &'a str,
origin: &'a str,
}

#[async_trait]
impl AttestationStore for CloudflareKv {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let url = self.endpoint.join("/attestation")?;
let res = self
.http_client
.post(url)
.header(
CsrfToken::header_name(),
self.token_manager
.generate_csrf_token()
.map_err(|e| anyhow::anyhow!("{e:?}"))?,
)
.json(&SetAttestationCompatBody {
attestation_id: id,
origin,
})
.timeout(Duration::from_secs(1))
.send()
.await?;
if res.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to set attestation: status:{} response body:{:?}",
res.status(),
res.text().await
))
}
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
let url = self
.endpoint
.join(&format!("/v1/compat-attestation/{id}"))?;
let response = self
.http_client
.get(url)
.timeout(Duration::from_secs(1))
.send()
.await?;
match response.status() {
status if status.is_success() => {
let value = response.text().await?;
Ok(Some(value))
}
StatusCode::NOT_FOUND => Ok(None),
status => Err(anyhow::anyhow!(
"Failed to get attestation: status:{status} response body:{:?}",
response.text().await
)),
}
}
}
44 changes: 44 additions & 0 deletions src/attestation_store/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use {
super::{cf_kv::CloudflareKv, AttestationStore, Result},
crate::util::redis,
async_trait::async_trait,
};

pub struct Store {
redis: redis::Adapter,
cf_kv: CloudflareKv,
}

impl Store {
pub fn new(redis: redis::Adapter, cf_kv: CloudflareKv) -> Self {
Self { redis, cf_kv }
}
}

#[async_trait]
impl AttestationStore for Store {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let redis_fut = self.redis.set_attestation(id, origin);
let cf_kv_fut = self.cf_kv.set_attestation(id, origin);
let (redis_res, cf_kv_res) = tokio::join!(redis_fut, cf_kv_fut);
if let Err(e) = cf_kv_res {
log::error!("Failed to set attestation in Cloudflare KV: {e} {e:?}");
}
redis_res
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
if let Some(attestation) = self.redis.get_attestation(id).await? {
Ok(Some(attestation))
} else {
let res = self.cf_kv.get_attestation(id).await;
match res {
Ok(a) => Ok(a),
Err(e) => {
log::error!("Failed to get attestation from Cloudflare KV: {e} {e:?}");
Ok(None)
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/attestation_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod cf_kv;
pub mod migration;
pub mod redis;

use async_trait::async_trait;
Expand Down
11 changes: 6 additions & 5 deletions src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ impl<S, G> Server<S, G> {
}
}

struct TokenManager {
#[derive(Clone)]
pub struct TokenManager {
encoding_key: jsonwebtoken::EncodingKey,
decoding_key: jsonwebtoken::DecodingKey,
}

impl TokenManager {
fn new(secret: &[u8]) -> Self {
pub fn new(secret: &[u8]) -> Self {
Self {
encoding_key: jsonwebtoken::EncodingKey::from_secret(secret),
decoding_key: jsonwebtoken::DecodingKey::from_secret(secret),
Expand Down Expand Up @@ -262,14 +263,14 @@ where
}

#[derive(Serialize, Deserialize)]
struct CsrfToken {
pub struct CsrfToken {
exp: usize,
}

impl CsrfToken {
// Using const value instead of a fn produces this warning:
// https://rust-lang.github.io/rust-clippy/master/index.html#declare_interior_mutable_const
const fn header_name() -> HeaderName {
pub const fn header_name() -> HeaderName {
HeaderName::from_static("x-csrf-token")
}

Expand All @@ -282,7 +283,7 @@ impl CsrfToken {
}

impl TokenManager {
fn generate_csrf_token(&self) -> Result<String, Response> {
pub fn generate_csrf_token(&self) -> Result<String, Response> {
use jsonwebtoken::{encode, get_current_timestamp, Header};

const TTL_SECS: usize = 60 * 60; // 1 hour
Expand Down
20 changes: 17 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use {
AXUM_HTTP_REQUESTS_DURATION_SECONDS,
},
bouncer::{
attestation_store::{cf_kv::CloudflareKv, migration},
event_sink,
http_server::{RequestInfo, ServerConfig},
http_server::{RequestInfo, ServerConfig, TokenManager},
project_registry::{self, CachedExt as _},
scam_guard,
util::redis,
Expand Down Expand Up @@ -55,6 +56,8 @@ pub struct Configuration {
pub data_api_auth_token: String,
pub scam_guard_cache_url: String,

pub cf_kv_endpoint: String,

pub secret: String,

pub s3_endpoint: Option<String>,
Expand Down Expand Up @@ -99,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> {
.install_recorder()
.context("Failed to install Prometheus metrics recorder")?;

let attestation_store = redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let attestation_store = {
let redis_attestation_store =
redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let cf_kv_attestation_store = CloudflareKv::new(
config
.cf_kv_endpoint
.parse()
.context("Failed to parse cf_kv_endpoint")?,
TokenManager::new(config.secret.as_bytes()),
);
migration::Store::new(redis_attestation_store, cf_kv_attestation_store)
};

let project_registry_cache = redis::new(
"project_registry_cache",
Expand Down
2 changes: 2 additions & 0 deletions terraform/ecs/cluster.tf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ resource "aws_ecs_task_definition" "app_task" {
{ name = "PROJECT_REGISTRY_CACHE_URL", value = var.project_registry_cache_url },
{ name = "SCAM_GUARD_CACHE_URL", value = var.scam_guard_cache_url },

{ name = "CF_KV_ENDPOINT", value = var.cf_kv_endpoint },

{ name = "DATA_LAKE_BUCKET", value = var.analytics_datalake_bucket_name },

{ name = "BLOCKED_COUNTRIES", value = var.ofac_blocked_countries },
Expand Down
5 changes: 5 additions & 0 deletions terraform/ecs/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ variable "scam_guard_cache_url" {
type = string
}

variable "cf_kv_endpoint" {
description = "The endpoint of the Cloudflare KV worker"
type = string
}

variable "ofac_blocked_countries" {
description = "The list of countries under OFAC sanctions"
type = string
Expand Down
2 changes: 2 additions & 0 deletions terraform/res_ecs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ module "ecs" {
project_registry_cache_url = "redis://${module.redis.endpoint}/1"
scam_guard_cache_url = "redis://${module.redis.endpoint}/2"

cf_kv_endpoint = var.cf_kv_endpoint

ofac_blocked_countries = var.ofac_blocked_countries

# Analytics
Expand Down
7 changes: 7 additions & 0 deletions terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ variable "ofac_blocked_countries" {
default = ""
}

#-------------------------------------------------------------------------------
# Cloudflare KV for V2 migration

variable "cf_kv_endpoint" {
description = "The endpoint of the Cloudflare KV worker"
type = string
}

#-------------------------------------------------------------------------------
# Project Registry
Expand Down