From a0a4a27e9ca8bac025537429d5830d6c74454784 Mon Sep 17 00:00:00 2001 From: ssebo Date: Mon, 20 Feb 2023 19:41:46 +0800 Subject: [PATCH] feat: add analysis url --- Cargo.lock | 16 ++-- Cargo.toml | 6 +- Dockerfile | 4 +- examples/local_file.rs | 4 + resources/fixtures/repo.json | 15 ++++ src/base.rs | 26 +++++- src/http/handler.rs | 164 ++++++++++++++++++++++++++--------- src/http/mod.rs | 17 ++++ src/main.rs | 1 + src/repo.rs | 34 ++++++++ 10 files changed, 233 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be93799..eb49543 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -386,9 +386,9 @@ dependencies = [ [[package]] name = "feature-probe-event" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3710aaf416257e689306e097885967536d3ea79f947fac7efc1b441428205ff" +checksum = "9ef547aae6c8bf890c41149452fdd3bcd5a971054079b42a089fd5752ee8b31c" dependencies = [ "axum", "headers", @@ -404,7 +404,7 @@ dependencies = [ [[package]] name = "feature-probe-server" -version = "1.3.13" +version = "2.0.1" dependencies = [ "anyhow", "axum", @@ -434,9 +434,9 @@ dependencies = [ [[package]] name = "feature-probe-server-sdk" -version = "1.2.9" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce700602261f8115158f868c75989e0e8e3e12d375a710f8d70fbedbc12911f2" +checksum = "b00a38fd3da26c8517b851a4e648056dd7e5d8ac23236c00804e12b46d401b07" dependencies = [ "anyhow", "byteorder", @@ -1711,9 +1711,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", "bytes", @@ -1726,7 +1726,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b037912..c42912a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] edition = "2021" name = "feature-probe-server" -version = "1.3.13" +version = "2.0.1" license = "Apache-2.0" authors = ["maintain@featureprobe.com"] description = "FeatureProbe Server for evaluating feature toggles" @@ -51,11 +51,11 @@ tracing-subscriber = { version = "0.3", features = [ ] } url = "2.3" socketio-rs = { optional = true, version = "0.1.7", default-features = false, features = ["server"] } -feature-probe-server-sdk = { version = "1.2.9", features = [ +feature-probe-server-sdk = { version="1.2.12", features = [ "internal", "use_tokio", ], default-features = false } -feature-probe-event = { version = "1.0.5", features = [ +feature-probe-event = { version="1.1.3", features = [ "use_tokio", "collector", ], default-features = false } diff --git a/Dockerfile b/Dockerfile index 4c4d1d9..582fa47 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,11 +7,13 @@ RUN update-ca-certificates WORKDIR /app COPY . /app +WORKDIR /app/server RUN rustc -V RUN cargo build --release --verbose FROM debian:buster-slim -COPY --from=build /app/target/release/feature_probe_server . +COPY --from=build /app/server/target/release/feature_probe_server . CMD [ "./feature_probe_server" ] + diff --git a/examples/local_file.rs b/examples/local_file.rs index de4ed27..eb44383 100644 --- a/examples/local_file.rs +++ b/examples/local_file.rs @@ -30,11 +30,14 @@ async fn main() { )) .unwrap(); let events_url = Url::parse(&format!("http://0.0.0.0:{}/api/events", api_port)).unwrap(); + let analysis_url = + Some(Url::parse(&format!("http://0.0.0.0:{}/analysis/events", api_port)).unwrap()); let refresh_seconds = Duration::from_secs(1); let config = ServerConfig { toggles_url, events_url: events_url.clone(), keys_url: None, + analysis_url: None, refresh_interval: refresh_seconds, client_sdk_key: Some(client_sdk_key.clone()), server_sdk_key: Some(server_sdk_key.clone()), @@ -57,6 +60,7 @@ async fn main() { let feature_probe_server = FpHttpHandler { repo: repo.clone(), events_url, + analysis_url, events_timeout: refresh_seconds, http_client: Default::default(), }; diff --git a/resources/fixtures/repo.json b/resources/fixtures/repo.json index 3f3138a..718b770 100644 --- a/resources/fixtures/repo.json +++ b/resources/fixtures/repo.json @@ -334,5 +334,20 @@ } ] } + }, + "events": { + "c52d1e0fd380f432a21f4743d59b26a0": { + "name": "c52d1e0fd380f432a21f4743d59b26a0", + "type": "PAGE_VIEW", + "matcher": "SIMPLE", + "url": "https://127.0.0.1/test" + }, + "b62d1e0fd380f432a21f47sss29b26a0": { + "name": "b62d1e0fd380f432a21f47sss29b26a0", + "type": "CLICK", + "matcher": "SIMPLE", + "url": "https://127.0.0.1/test", + "selector": "#DEMO" + } } } diff --git a/src/base.rs b/src/base.rs index fca31eb..c084e7e 100644 --- a/src/base.rs +++ b/src/base.rs @@ -1,4 +1,5 @@ use feature_probe_server_sdk::Url; +use log::warn; use serde::Deserialize; use std::{fmt::Display, time::Duration}; use thiserror::Error; @@ -30,6 +31,7 @@ pub struct ServerConfig { pub toggles_url: Url, pub events_url: Url, pub keys_url: Option, + pub analysis_url: Option, pub refresh_interval: Duration, pub server_sdk_key: Option, pub client_sdk_key: Option, @@ -103,6 +105,22 @@ impl ServerConfig { } }; + let analysis_url = match config.get_string("analysis_url") { + Ok(url) => match Url::parse(&url) { + Err(e) => { + return Err(FPServerError::ConfigError(format!( + "INVALID FP_ANALYSIS_URL: {}", + e, + ))) + } + Ok(u) => Some(u), + }, + Err(_) => { + warn!("NOT SET FP_ANALYSIS_URL"); + None + } + }; + let refresh_interval = match config.get_int("refresh_seconds") { Err(_) => { return Err(FPServerError::ConfigError( @@ -131,6 +149,7 @@ impl ServerConfig { Ok(ServerConfig { toggles_url, events_url, + analysis_url, keys_url, refresh_interval, client_sdk_key, @@ -150,10 +169,15 @@ impl Display for ServerConfig { None => "None".to_owned(), Some(s) => s.to_string(), }; - write!(f, "server_port {}, toggles_url {}, events_url {}, keys_url {}, refresh_interval {:?}, client_sdk_key {:?}, server_sdk_key {:?}", + let analysis_url = match &self.analysis_url { + None => "None".to_owned(), + Some(s) => s.to_string(), + }; + write!(f, "server_port {}, toggles_url {}, events_url {}, analysis_url {}, keys_url {}, refresh_interval {:?}, client_sdk_key {:?}, server_sdk_key {:?}", self.server_port, self.toggles_url, self.events_url, + analysis_url, keys_url, self.refresh_interval, self.client_sdk_key, diff --git a/src/http/handler.rs b/src/http/handler.rs index 2558eb2..de691e8 100644 --- a/src/http/handler.rs +++ b/src/http/handler.rs @@ -10,16 +10,15 @@ use axum::{ response::{IntoResponse, Response}, Json, TypedHeader, }; -use feature_probe_event::{ - collector::{EventHandler, FPEventError}, - event::PackedData, -}; +use feature_probe_event::collector::{EventHandler, FPEventError}; use feature_probe_server_sdk::{FPUser, Repository, SyncType, Url}; +use log::info; use parking_lot::Mutex; use reqwest::{ header::{self, AUTHORIZATION, USER_AGENT}, Client, Method, }; +use serde_json::Value; use std::{ collections::{HashMap, VecDeque}, fs, @@ -27,7 +26,7 @@ use std::{ sync::Arc, time::Duration, }; -use tracing::{debug, error}; +use tracing::debug; #[async_trait] pub trait HttpHandler { @@ -42,6 +41,11 @@ pub trait HttpHandler { TypedHeader(SdkAuthorization(sdk_key)): TypedHeader, ) -> Result; + async fn client_sdk_events( + &self, + TypedHeader(SdkAuthorization(sdk_key)): TypedHeader, + ) -> Result; + async fn update_toggles( &self, Json(params): Json, @@ -67,6 +71,7 @@ pub struct FpHttpHandler { pub repo: Arc, pub http_client: Arc, pub events_url: Url, + pub analysis_url: Option, pub events_timeout: Duration, } @@ -110,18 +115,26 @@ impl HttpHandler for FpHttpHandler { match self.repo.client_sdk_eval_string(&sdk_key, &user) { Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()), Err(e) => match e { - NotReady(_) => Ok(( - StatusCode::SERVICE_UNAVAILABLE, - [(header::CONTENT_TYPE, "application/json")], - "{}", - ) - .into_response()), - NotFound(_) => Ok(( - StatusCode::OK, - [(header::CONTENT_TYPE, "application/json")], - "{}", - ) - .into_response()), + NotReady(_) => { + Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response()) + } + NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()), + _ => Err(e), + }, + } + } + + async fn client_sdk_events( + &self, + TypedHeader(SdkAuthorization(sdk_key)): TypedHeader, + ) -> Result { + match self.repo.client_sdk_events_string(&sdk_key) { + Ok(body) => Ok((StatusCode::OK, cors_headers(), body).into_response()), + Err(e) => match e { + NotReady(_) => { + Ok((StatusCode::SERVICE_UNAVAILABLE, cors_headers(), "{}").into_response()) + } + NotFound(_) => Ok((StatusCode::OK, cors_headers(), "{}").into_response()), _ => Err(e), }, } @@ -188,39 +201,95 @@ impl EventHandler for FpHttpHandler { sdk_key: String, user_agent: String, headers: HeaderMap, - data: VecDeque, + data: VecDeque, ) -> Result { let http_client = self.http_client.clone(); let events_url = self.events_url.clone(); + let analysis_url = self.analysis_url.clone(); let timeout = self.events_timeout; - tokio::spawn(async move { - let ua = headers.get("ua"); - let auth = SdkAuthorization(sdk_key).encode(); - let mut headers = HeaderMap::with_capacity(3); - headers.append(AUTHORIZATION, auth); - if let Ok(v) = HeaderValue::from_str(&user_agent) { - headers.append(USER_AGENT, v); + let server_sdk_key = { + if sdk_key.starts_with("client") { + match self.repo.secret_keys().get(sdk_key.as_str()) { + Some(key) => key.clone(), + None => { + return Ok((StatusCode::UNAUTHORIZED, cors_headers(), "{}").into_response()) + } + } + } else { + sdk_key.clone() } + }; + let headers = build_header(headers, server_sdk_key, &user_agent); + send_events( + headers, + events_url, + analysis_url, + timeout, + data, + http_client, + ) + .await; + Ok((StatusCode::OK, cors_headers(), "{}").into_response()) + } +} - if let Some(ua) = ua { - headers.append("ua", ua.clone()); - } +async fn send_events( + headers: HeaderMap, + events_url: Url, + analysis_url: Option, + timeout: Duration, + data: VecDeque, + http_client: Arc, +) { + if let Some(analysis_url) = analysis_url { + let headers = headers.clone(); + let data = data.clone(); + let http_client = http_client.clone(); + tokio::spawn(async move { + do_send_events(headers, analysis_url, timeout, data, http_client).await; + }); + } - let request = http_client - .request(Method::POST, events_url.clone()) - .headers(headers) - .timeout(timeout) - .json(&data); + tokio::spawn(async move { + do_send_events(headers, events_url, timeout, data, http_client).await; + }); +} - debug!("event post req: {:?}", request); +async fn do_send_events( + headers: HeaderMap, + url: Url, + timeout: Duration, + data: VecDeque, + http_client: Arc, +) { + let traffic_request = http_client + .request(Method::POST, url.clone()) + .headers(headers.clone()) + .timeout(timeout) + .json(&data); - match request.send().await { - Err(e) => error!("event post error: {}", e), - Ok(r) => debug!("event post success: {:?}", r), - }; - }); - Ok((StatusCode::OK, cors_headers(), "{}").into_response()) + debug!("traffic post data: {data:?}"); + debug!("traffic post req: {traffic_request:?}"); + + match traffic_request.send().await { + Err(e) => info!("traffic post error: {}", e), + Ok(r) => info!("traffic post success: {:?}", r), + }; +} + +fn build_header(headers: HeaderMap, sdk_key: String, user_agent: &str) -> HeaderMap { + let ua = headers.get("ua"); + let auth = SdkAuthorization(sdk_key).encode(); + let mut headers = HeaderMap::with_capacity(3); + headers.append(AUTHORIZATION, auth); + if let Ok(v) = HeaderValue::from_str(user_agent) { + headers.append(USER_AGENT, v); } + + if let Some(ua) = ua { + headers.append("ua", ua.clone()); + } + headers } fn decode_user(user: String) -> Result { @@ -290,6 +359,19 @@ impl HttpHandler for LocalFileHttpHandlerForTest { .into_response()) } + async fn client_sdk_events( + &self, + TypedHeader(SdkAuthorization(_sdk_key)): TypedHeader, + ) -> Result { + let body = "{}".to_owned(); + Ok(( + StatusCode::OK, + [(header::CONTENT_TYPE, "application/json")], + body, + ) + .into_response()) + } + async fn update_toggles( &self, Json(_params): Json, @@ -339,7 +421,7 @@ impl EventHandler for LocalFileHttpHandlerForTest { _sdk_key: String, _user_agent: String, _headers: HeaderMap, - _data: VecDeque, + _data: VecDeque, ) -> Result { Ok((StatusCode::OK, cors_headers(), "").into_response()) } diff --git a/src/http/mod.rs b/src/http/mod.rs index e61c4cb..92072d6 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -33,6 +33,10 @@ where "/api/client-sdk/toggles", get(client_sdk_toggles::).options(client_cors), ) + .route( + "/api/client-sdk/events", + get(client_sdk_events::).options(client_cors), + ) .route("/api/server-sdk/toggles", get(server_sdk_toggles::)) .route("/api/events", post(post_events::).options(client_cors)) .route("/internal/all_secrets", get(all_secrets::)) // not for public network @@ -67,6 +71,16 @@ where handler.client_sdk_toggles(params, sdk_key).await } +async fn client_sdk_events( + sdk_key: TypedHeader, + Extension(handler): Extension, +) -> Result +where + T: HttpHandler + Clone + Send + Sync + 'static, +{ + handler.client_sdk_events(sdk_key).await +} + async fn server_sdk_toggles( sdk_key: TypedHeader, Extension(handler): Extension, @@ -428,10 +442,12 @@ mod tests { .unwrap(); let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", target_port)).unwrap(); + let analysis_url = None; let config = ServerConfig { toggles_url, events_url: events_url.clone(), refresh_interval: Duration::from_secs(1), + analysis_url: None, keys_url: None, client_sdk_key: Some(client_sdk_key.to_owned()), server_sdk_key: Some(server_sdk_key.to_owned()), @@ -455,6 +471,7 @@ mod tests { let feature_probe_server = FpHttpHandler { repo: repo.clone(), events_url, + analysis_url, events_timeout: Duration::from_secs(10), http_client: Default::default(), }; diff --git a/src/main.rs b/src/main.rs index 8cdd78e..4cc86be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -121,6 +121,7 @@ fn init_handler( repo: Arc::new(repo), http_client: Default::default(), events_url: server_config.events_url, + analysis_url: server_config.analysis_url, events_timeout: server_config.refresh_interval, }) } diff --git a/src/repo.rs b/src/repo.rs index 6eb04de..7443a10 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -147,6 +147,18 @@ impl SdkRepository { self.inner.all_evaluated_string(server_sdk_key, user) } + pub fn client_sdk_events_string(&self, client_sdk_key: &str) -> Result { + let secret_mapping = self.inner.secret_mapping.read(); + if secret_mapping.version() == 0 { + return Err(FPServerError::NotReady(client_sdk_key.to_string())); + } + let server_sdk_key = match secret_mapping.server_sdk_key(client_sdk_key) { + Some(sdk_key) => sdk_key, + None => return Err(FPServerError::NotFound(client_sdk_key.to_string())), + }; + self.inner.all_event_string(server_sdk_key) + } + pub fn client_sync_now(&self, sdk_key: &str, t: SyncType) -> Result { let sdk_clients = self.inner.sdk_clients.write(); let client = match sdk_clients.get(sdk_key) { @@ -283,6 +295,20 @@ impl Inner { .collect(); serde_json::to_string(&map).map_err(|e| FPServerError::JsonError(e.to_string())) } + + fn all_event_string(&self, sdk_key: &str) -> Result { + let clients = self.sdk_clients.read(); + let client = match clients.get(sdk_key) { + Some(client) if !client.initialized() => { + return Err(FPServerError::NotReady(sdk_key.to_string())) + } + Some(client) => client, + None => return Err(FPServerError::NotReady(sdk_key.to_string())), + }; + let arc_repo = client.repo(); + let repo = arc_repo.read(); + serde_json::to_string(&repo.events).map_err(|e| FPServerError::JsonError(e.to_string())) + } } #[cfg(test)] @@ -351,6 +377,8 @@ mod tests { let repo_string_err = repository.server_sdk_repo_string(&non_sdk_key); assert_eq!(repo_string_err.err(), Some(NotFound(non_sdk_key))); + let events_string = repository.client_sdk_events_string(&client_sdk_key); + assert!(events_string.is_ok()); let repo_string = repository.server_sdk_repo_string(&server_sdk_key); assert!(repo_string.is_ok()); let r = serde_json::from_str::(&repo_string.unwrap()).unwrap(); @@ -409,9 +437,11 @@ mod tests { let toggles_url = Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap(); let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap(); + let analysis_url = None; let config = ServerConfig { toggles_url, events_url, + analysis_url, refresh_interval: Duration::from_secs(1), client_sdk_key: Some(client_sdk_key.to_owned()), server_sdk_key: Some(server_sdk_key.to_owned()), @@ -444,9 +474,11 @@ mod tests { let toggles_url = Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap(); let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap(); + let analysis_url = None; let config = ServerConfig { toggles_url, events_url, + analysis_url, refresh_interval: Duration::from_secs(1), client_sdk_key: Some(client_sdk_key.to_owned()), server_sdk_key: Some(server_sdk_key.to_owned()), @@ -476,9 +508,11 @@ mod tests { Url::parse(&format!("http://127.0.0.1:{}/api/server-sdk/toggles", port)).unwrap(); let events_url = Url::parse(&format!("http://127.0.0.1:{}/api/events", port)).unwrap(); let keys_url = Url::parse(&format!("http://127.0.0.1:{}/api/secret-keys", port)).unwrap(); + let analysis_url = None; let config = ServerConfig { toggles_url, events_url, + analysis_url, refresh_interval: Duration::from_millis(100), client_sdk_key: None, server_sdk_key: None,