diff --git a/Dockerfile b/Dockerfile index bcf1b80e..ea80517f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,10 @@ COPY ./rust-connector-sdk . RUN cargo build --release FROM debian:buster-slim as connector +RUN apt-get update \ + && DEBIAN_FRONTEND=noninteractive \ + apt-get install --no-install-recommends --assume-yes\ + libssl-dev COPY --from=build /app/target/release/ndc_hub_example ./ndc_hub_example ENTRYPOINT [ "/ndc_hub_example" ] -CMD [ "serve", "--port", "8080" ] +CMD [ "serve" ] diff --git a/rust-connector-sdk/src/connector.rs b/rust-connector-sdk/src/connector.rs index a2a2fda9..3e41dfcd 100644 --- a/rust-connector-sdk/src/connector.rs +++ b/rust-connector-sdk/src/connector.rs @@ -1,4 +1,4 @@ -use std::error::Error; +use std::{error::Error, path::PathBuf}; use async_trait::async_trait; use ndc_client::models; @@ -32,16 +32,6 @@ pub enum KeyOrIndex { Index(u32), } -/// Errors which occur when trying to validate connector -/// configuration. -/// -/// See [`Connector::update_configuration`]. -#[derive(Debug, Error)] -pub enum UpdateConfigurationError { - #[error("error validating configuration: {0}")] - Other(#[from] Box), -} - /// Errors which occur when trying to initialize connector /// state. /// @@ -201,23 +191,15 @@ pub enum MutationError { /// connection string would be state. #[async_trait] pub trait Connector { - /// The type of unvalidated, raw configuration, as provided by the user. - type RawConfiguration; /// The type of validated configuration type Configuration; /// The type of unserializable state type State; - fn make_empty_configuration() -> Self::RawConfiguration; - - async fn update_configuration( - config: Self::RawConfiguration, - ) -> Result; - /// Validate the raw configuration provided by the user, /// returning a configuration error or a validated [`Connector::Configuration`]. async fn validate_raw_configuration( - configuration: Self::RawConfiguration, + configuration_dir: PathBuf, ) -> Result; /// Initialize the connector's in-memory state. diff --git a/rust-connector-sdk/src/connector/example.rs b/rust-connector-sdk/src/connector/example.rs index f00834dd..8b0f2ce9 100644 --- a/rust-connector-sdk/src/connector/example.rs +++ b/rust-connector-sdk/src/connector/example.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::path::PathBuf; use async_trait::async_trait; use tracing::info_span; @@ -11,20 +12,11 @@ pub struct Example {} #[async_trait] impl Connector for Example { - type RawConfiguration = (); type Configuration = (); type State = (); - fn make_empty_configuration() -> Self::RawConfiguration {} - - async fn update_configuration( - _config: Self::RawConfiguration, - ) -> Result { - Ok(()) - } - async fn validate_raw_configuration( - _configuration: Self::Configuration, + _configuration_dir: PathBuf, ) -> Result { Ok(()) } diff --git a/rust-connector-sdk/src/default_main.rs b/rust-connector-sdk/src/default_main.rs index d5fe48a7..22789c90 100644 --- a/rust-connector-sdk/src/default_main.rs +++ b/rust-connector-sdk/src/default_main.rs @@ -2,7 +2,7 @@ mod v2_compat; use crate::{ check_health, - connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError}, + connector::Connector, json_rejection::JsonRejection, json_response::JsonResponse, routes, @@ -12,7 +12,7 @@ use axum_extra::extract::WithRejection; use std::error::Error; use std::net; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::process::exit; use async_trait::async_trait; @@ -24,7 +24,6 @@ use axum::{ routing::{get, post}, Json, Router, }; -use base64::{engine::general_purpose, Engine}; use clap::{Parser, Subcommand}; use ndc_client::models::{ CapabilitiesResponse, ErrorResponse, ExplainResponse, MutationRequest, MutationResponse, @@ -32,11 +31,8 @@ use ndc_client::models::{ }; use ndc_test::report; use prometheus::Registry; -use schemars::{schema::RootSchema, JsonSchema}; use serde::{de::DeserializeOwned, Serialize}; -use tower_http::{ - cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer, -}; +use tower_http::{trace::TraceLayer, validate_request::ValidateRequestHeaderLayer}; #[derive(Parser)] struct CliArgs { @@ -46,10 +42,8 @@ struct CliArgs { #[derive(Clone, Subcommand)] enum Command { - #[command(arg_required_else_help = true)] - Serve(ServeCommand), #[command()] - Configuration(ConfigurationCommand), + Serve(ServeCommand), #[command()] Test(TestCommand), #[command()] @@ -60,61 +54,55 @@ enum Command { #[derive(Clone, Parser)] struct ServeCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg( + long, + value_name = "DIRECTORY", + env = "HASURA_CONFIGURATION_DIRECTORY", + default_value = "/etc/connector" + )] configuration: PathBuf, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] - port: Port, + #[arg(long, value_name = "ENDPOINT", env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + otlp_endpoint: Option, #[arg( long, - value_name = "SERVICE_TOKEN_SECRET", - env = "SERVICE_TOKEN_SECRET" + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 )] + port: Port, + #[arg(long, value_name = "TOKEN", env = "HASURA_SERVICE_TOKEN_SECRET")] service_token_secret: Option, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] + #[arg(long, value_name = "NAME", env = "OTEL_SERVICE_NAME")] service_name: Option, - #[arg(long, env = "ENABLE_V2_COMPATIBILITY")] + #[arg(long, env = "HASURA_ENABLE_V2_COMPATIBILITY")] enable_v2_compatibility: bool, } -#[derive(Clone, Parser)] -struct ConfigurationCommand { - #[command(subcommand)] - command: ConfigurationSubcommand, -} - -#[derive(Clone, Subcommand)] -enum ConfigurationSubcommand { - #[command()] - Serve(ServeConfigurationCommand), -} - -#[derive(Clone, Parser)] -struct ServeConfigurationCommand { - #[arg(long, value_name = "PORT", env = "PORT", default_value = "9100")] - port: Port, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] - service_name: Option, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface -} - #[derive(Clone, Parser)] struct TestCommand { #[arg(long, value_name = "SEED", env = "SEED")] seed: Option, - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg( + long, + value_name = "DIRECTORY", + env = "HASURA_CONFIGURATION_DIRECTORY", + default_value = "/etc/connector" + )] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: Option, } #[derive(Clone, Parser)] struct ReplayCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg( + long, + value_name = "DIRECTORY", + env = "HASURA_CONFIGURATION_DIRECTORY", + default_value = "/etc/connector" + )] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: PathBuf, } @@ -122,7 +110,12 @@ struct ReplayCommand { struct CheckHealthCommand { #[arg(long, value_name = "HOST")] host: Option, - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] + #[arg( + long, + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 + )] port: Port, } @@ -161,7 +154,6 @@ pub struct ServerState { pub async fn default_main( ) -> Result<(), Box> where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, C::State: Sync + Send + Clone, { @@ -169,7 +161,6 @@ where match command { Command::Serve(serve_command) => serve::(serve_command).await, - Command::Configuration(configure_command) => configuration::(configure_command).await, Command::Test(test_command) => test::(test_command).await, Command::Replay(replay_command) => replay::(replay_command).await, Command::CheckHealth(check_health_command) => check_health(check_health_command).await, @@ -180,7 +171,6 @@ async fn serve( serve_command: ServeCommand, ) -> Result<(), Box> where - C::RawConfiguration: DeserializeOwned + Sync + Send, C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, C::State: Sync + Send + Clone, { @@ -243,17 +233,13 @@ where /// Initialize the server state from the configuration file. pub async fn init_server_state( - config_file: impl AsRef, + config_directory: PathBuf, ) -> ServerState where - C::RawConfiguration: DeserializeOwned + Sync + Send, C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, C::State: Sync + Send + Clone, { - let configuration_json = std::fs::read_to_string(config_file).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) + let configuration = C::validate_raw_configuration(config_directory) .await .unwrap(); @@ -274,7 +260,6 @@ pub fn create_router( service_token_secret: Option, ) -> Router where - C::RawConfiguration: DeserializeOwned + Sync + Send, C::Configuration: Serialize + Clone + Sync + Send, C::State: Sync + Send + Clone, { @@ -355,7 +340,6 @@ pub fn create_v2_router( service_token_secret: Option, ) -> Router where - C::RawConfiguration: DeserializeOwned + Sync + Send, C::Configuration: Serialize + Clone + Sync + Send, C::State: Sync + Send + Clone, { @@ -476,195 +460,6 @@ async fn post_query( routes::post_query::(&state.configuration, &state.state, request).await } -async fn configuration( - command: ConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, - C::Configuration: Sync + Send + Serialize, -{ - match command.command { - ConfigurationSubcommand::Serve(serve_command) => { - serve_configuration::(serve_command).await - } - } -} - -async fn serve_configuration( - serve_command: ServeConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema + Sync + Send, - C::Configuration: Sync + Send + Serialize, -{ - let port = serve_command.port; - let address = net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), port); - - init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) - .expect("Unable to initialize tracing"); - - println!("Starting server on {}", address); - - let cors = CorsLayer::new() - .allow_origin(tower_http::cors::Any) - .allow_headers(tower_http::cors::Any); - - let router = Router::new() - .route("/", get(get_empty::).post(post_update::)) - .route("/schema", get(get_config_schema::)) - .route("/validate", post(post_validate::)) - .route("/health", get(|| async {})) - .layer( - TraceLayer::new_for_http() - .make_span_with(make_span) - .on_response(on_response) - .on_failure(|err, _dur, _span: &_| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Request failure", - name = "Request failure", - body = %err, - error = true, - ); - }), - ) - .layer(cors); - - axum::Server::bind(&address) - .serve(router.into_make_service()) - .with_graceful_shutdown(async { - tokio::signal::ctrl_c() - .await - .expect("unable to install signal handler"); - }) - .await?; - - Ok(()) -} - -async fn get_empty() -> Json -where - C::RawConfiguration: Serialize, -{ - Json(C::make_empty_configuration()) -} - -async fn post_update( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, String)> -where - C::RawConfiguration: Serialize + DeserializeOwned, -{ - let updated = C::update_configuration(configuration) - .await - .map_err(|err| match err { - UpdateConfigurationError::Other(err) => { - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) - } - })?; - Ok(Json(updated)) -} - -async fn get_config_schema() -> Json -where - C::RawConfiguration: JsonSchema, -{ - let schema = schemars::schema_for!(C::RawConfiguration); - Json(schema) -} - -#[derive(Debug, Clone, Serialize)] -struct ValidateResponse { - schema: SchemaResponse, - capabilities: CapabilitiesResponse, - resolved_configuration: String, -} - -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -enum ValidateErrors { - InvalidConfiguration { ranges: Vec }, - UnableToBuildSchema, - UnableToBuildCapabilities, - JsonEncodingError(String), -} - -async fn post_validate( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> -where - C::RawConfiguration: DeserializeOwned, - C::Configuration: Serialize, -{ - let configuration = - C::validate_raw_configuration(configuration) - .await - .map_err(|e| match e { - crate::connector::ValidateError::ValidateError(ranges) => ( - StatusCode::BAD_REQUEST, - Json(ValidateErrors::InvalidConfiguration { ranges }), - ), - })?; - let schema = C::get_schema(&configuration) - .await - .and_then(JsonResponse::into_value) - .map_err(|e| match e { - SchemaError::Other(err) => { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build schema", - name = "Unable to build schema", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildSchema), - ) - } - })?; - let capabilities = - C::get_capabilities() - .await - .into_value() - .map_err(|e: Box| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build capabilities", - name = "Unable to build capabilities", - body = %e, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildCapabilities), - ) - })?; - let resolved_config_bytes = serde_json::to_vec(&configuration).map_err(|err| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to serialize validated configuration", - name = "Unable to serialize validated configuration", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::JsonEncodingError(err.to_string())), - ) - })?; - let resolved_configuration = general_purpose::STANDARD.encode(resolved_config_bytes); - Ok(Json(ValidateResponse { - schema, - capabilities, - resolved_configuration, - })) -} - struct ConnectorAdapter { configuration: C::Configuration, state: C::State, @@ -725,7 +520,6 @@ async fn test( command: TestCommand, ) -> Result<(), Box> where - C::RawConfiguration: DeserializeOwned, C::Configuration: Sync + Send + 'static, C::State: Send + Sync + 'static, { @@ -751,7 +545,6 @@ async fn replay( command: ReplayCommand, ) -> Result<(), Box> where - C::RawConfiguration: DeserializeOwned, C::Configuration: Sync + Send + 'static, C::State: Send + Sync + 'static, { @@ -769,15 +562,9 @@ where } async fn make_connector_adapter( - configuration_path: impl AsRef, -) -> ConnectorAdapter -where - C::RawConfiguration: DeserializeOwned, -{ - let configuration_json = std::fs::read_to_string(configuration_path).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) + configuration_path: PathBuf, +) -> ConnectorAdapter { + let configuration = C::validate_raw_configuration(configuration_path) .await .unwrap();