diff --git a/Dockerfile b/Dockerfile index 0fb9a76a..972c929d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,11 @@ COPY ./rust-connector-sdk . RUN cargo build --release FROM debian:buster-slim as connector +RUN set -ex; \ + apt-get update; \ + DEBIAN_FRONTEND=noninteractive \ + apt-get install --no-install-recommends --assume-yes \ + libssl-dev COPY --from=build /app/target/release/ndc_hub_example ./ndc_hub_example ENTRYPOINT [ "/ndc_hub_example" ] -CMD [ "serve", "--port", "8080" ] +CMD [ "serve", "--configuration", "/etc/connector" ] diff --git a/rust-connector-sdk/src/connector.rs b/rust-connector-sdk/src/connector.rs index 593bef22..0c4488c9 100644 --- a/rust-connector-sdk/src/connector.rs +++ b/rust-connector-sdk/src/connector.rs @@ -1,4 +1,4 @@ -use std::error::Error; +use std::{error::Error, path::Path}; use async_trait::async_trait; use ndc_client::models; @@ -32,16 +32,6 @@ pub enum KeyOrIndex { Index(u32), } -/// Errors which occur when trying to validate connector -/// configuration. -/// -/// See [`Connector::update_configuration`]. -#[derive(Debug, Error)] -pub enum UpdateConfigurationError { - #[error("error validating configuration: {0}")] - Other(#[from] Box), -} - /// Errors which occur when trying to initialize connector /// state. /// @@ -201,23 +191,15 @@ pub enum MutationError { /// connection string would be state. #[async_trait] pub trait Connector { - /// The type of unvalidated, raw configuration, as provided by the user. - type RawConfiguration: Sync + Send; /// The type of validated configuration type Configuration: Sync + Send; /// The type of unserializable state type State: Sync + Send; - fn make_empty_configuration() -> Self::RawConfiguration; - - async fn update_configuration( - config: Self::RawConfiguration, - ) -> Result; - /// Validate the raw configuration provided by the user, /// returning a configuration error or a validated [`Connector::Configuration`]. - async fn validate_raw_configuration( - configuration: Self::RawConfiguration, + async fn parse_configuration( + configuration_dir: impl AsRef + Send, ) -> Result; /// Initialize the connector's in-memory state. diff --git a/rust-connector-sdk/src/connector/example.rs b/rust-connector-sdk/src/connector/example.rs index 55df2604..0a19db29 100644 --- a/rust-connector-sdk/src/connector/example.rs +++ b/rust-connector-sdk/src/connector/example.rs @@ -11,20 +11,11 @@ pub struct Example {} #[async_trait] impl Connector for Example { - type RawConfiguration = (); type Configuration = (); type State = (); - fn make_empty_configuration() -> Self::RawConfiguration {} - - async fn update_configuration( - _config: Self::RawConfiguration, - ) -> Result { - Ok(()) - } - - async fn validate_raw_configuration( - _configuration: Self::Configuration, + async fn parse_configuration( + _configuration_dir: impl AsRef + Send, ) -> Result { Ok(()) } diff --git a/rust-connector-sdk/src/default_main.rs b/rust-connector-sdk/src/default_main.rs index 5f068e33..9291521c 100644 --- a/rust-connector-sdk/src/default_main.rs +++ b/rust-connector-sdk/src/default_main.rs @@ -1,15 +1,5 @@ mod v2_compat; -use crate::{ - check_health, - connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError}, - json_rejection::JsonRejection, - json_response::JsonResponse, - routes, - tracing::{init_tracing, make_span, on_response}, -}; -use axum_extra::extract::WithRejection; - use std::error::Error; use std::net; use std::path::{Path, PathBuf}; @@ -24,19 +14,24 @@ use axum::{ routing::{get, post}, Json, Router, }; -use base64::{engine::general_purpose, Engine}; +use axum_extra::extract::WithRejection; use clap::{Parser, Subcommand}; +use prometheus::Registry; +use serde::Serialize; +use tower_http::{trace::TraceLayer, validate_request::ValidateRequestHeaderLayer}; + use ndc_client::models::{ CapabilitiesResponse, ErrorResponse, ExplainResponse, MutationRequest, MutationResponse, QueryRequest, QueryResponse, SchemaResponse, }; use ndc_test::report; -use prometheus::Registry; -use schemars::{schema::RootSchema, JsonSchema}; -use serde::{de::DeserializeOwned, Serialize}; -use tower_http::{ - cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer, -}; + +use crate::check_health; +use crate::connector::Connector; +use crate::json_rejection::JsonRejection; +use crate::json_response::JsonResponse; +use crate::routes; +use crate::tracing::{init_tracing, make_span, on_response}; #[derive(Parser)] struct CliArgs { @@ -46,10 +41,8 @@ struct CliArgs { #[derive(Clone, Subcommand)] enum Command { - #[command(arg_required_else_help = true)] - Serve(ServeCommand), #[command()] - Configuration(ConfigurationCommand), + Serve(ServeCommand), #[command()] Test(TestCommand), #[command()] @@ -60,61 +53,40 @@ enum Command { #[derive(Clone, Parser)] struct ServeCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] - port: Port, + #[arg(long, value_name = "ENDPOINT", env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + otlp_endpoint: Option, #[arg( long, - value_name = "SERVICE_TOKEN_SECRET", - env = "SERVICE_TOKEN_SECRET" + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 )] + port: Port, + #[arg(long, value_name = "TOKEN", env = "HASURA_SERVICE_TOKEN_SECRET")] service_token_secret: Option, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] + #[arg(long, value_name = "NAME", env = "OTEL_SERVICE_NAME")] service_name: Option, - #[arg(long, env = "ENABLE_V2_COMPATIBILITY")] + #[arg(long, env = "HASURA_ENABLE_V2_COMPATIBILITY")] enable_v2_compatibility: bool, } -#[derive(Clone, Parser)] -struct ConfigurationCommand { - #[command(subcommand)] - command: ConfigurationSubcommand, -} - -#[derive(Clone, Subcommand)] -enum ConfigurationSubcommand { - #[command()] - Serve(ServeConfigurationCommand), -} - -#[derive(Clone, Parser)] -struct ServeConfigurationCommand { - #[arg(long, value_name = "PORT", env = "PORT", default_value = "9100")] - port: Port, - #[arg(long, value_name = "OTEL_SERVICE_NAME", env = "OTEL_SERVICE_NAME")] - service_name: Option, - #[arg(long, value_name = "OTLP_ENDPOINT", env = "OTLP_ENDPOINT")] - otlp_endpoint: Option, // NOTE: `tracing` crate uses `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` ENV variable, but we want to control the endpoint via CLI interface -} - #[derive(Clone, Parser)] struct TestCommand { #[arg(long, value_name = "SEED", env = "SEED")] seed: Option, - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: Option, } #[derive(Clone, Parser)] struct ReplayCommand { - #[arg(long, value_name = "CONFIGURATION_FILE", env = "CONFIGURATION_FILE")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_CONFIGURATION_DIRECTORY")] configuration: PathBuf, - #[arg(long, value_name = "DIRECTORY", env = "SNAPSHOTS_DIR")] + #[arg(long, value_name = "DIRECTORY", env = "HASURA_SNAPSHOTS_DIR")] snapshots_dir: PathBuf, } @@ -122,7 +94,12 @@ struct ReplayCommand { struct CheckHealthCommand { #[arg(long, value_name = "HOST")] host: Option, - #[arg(long, value_name = "PORT", env = "PORT", default_value = "8100")] + #[arg( + long, + value_name = "PORT", + env = "HASURA_CONNECTOR_PORT", + default_value_t = 8080 + )] port: Port, } @@ -174,7 +151,6 @@ where /// - Logs are written to stdout pub async fn default_main() -> Result<(), Box> where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema, C::Configuration: Clone + Serialize, C::State: Clone, { @@ -182,7 +158,6 @@ where match command { Command::Serve(serve_command) => serve::(serve_command).await, - Command::Configuration(configure_command) => configuration::(configure_command).await, Command::Test(test_command) => test::(test_command).await, Command::Replay(replay_command) => replay::(replay_command).await, Command::CheckHealth(check_health_command) => check_health(check_health_command).await, @@ -193,9 +168,8 @@ async fn serve( serve_command: ServeCommand, ) -> Result<(), Box> where - C::RawConfiguration: DeserializeOwned, - C::Configuration: Clone + Serialize, - C::State: Clone, + C::Configuration: Serialize + Clone, + C::State: Sync + Send + Clone, { init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) .expect("Unable to initialize tracing"); @@ -255,18 +229,10 @@ where } /// Initialize the server state from the configuration file. -pub async fn init_server_state( - config_file: impl AsRef, -) -> ServerState -where - C::RawConfiguration: DeserializeOwned, -{ - let configuration_json = std::fs::read_to_string(config_file).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) - .await - .unwrap(); +pub async fn init_server_state( + config_directory: impl AsRef + Send, +) -> ServerState { + let configuration = C::parse_configuration(config_directory).await.unwrap(); let mut metrics = Registry::new(); let state = C::try_init_state(&configuration, &mut metrics) @@ -493,188 +459,6 @@ async fn post_query( routes::post_query::(&state.configuration, &state.state, request).await } -async fn configuration( - command: ConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema, - C::Configuration: Serialize, -{ - match command.command { - ConfigurationSubcommand::Serve(serve_command) => { - serve_configuration::(serve_command).await - } - } -} - -async fn serve_configuration( - serve_command: ServeConfigurationCommand, -) -> Result<(), Box> -where - C::RawConfiguration: Serialize + DeserializeOwned + JsonSchema, - C::Configuration: Serialize, -{ - let port = serve_command.port; - let address = net::SocketAddr::new(net::IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), port); - - init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) - .expect("Unable to initialize tracing"); - - println!("Starting server on {}", address); - - let cors = CorsLayer::new() - .allow_origin(tower_http::cors::Any) - .allow_headers(tower_http::cors::Any); - - let router = Router::new() - .route("/", get(get_empty::).post(post_update::)) - .route("/schema", get(get_config_schema::)) - .route("/validate", post(post_validate::)) - .route("/health", get(|| async {})) - .layer( - TraceLayer::new_for_http() - .make_span_with(make_span) - .on_response(on_response) - .on_failure(|err, _dur, _span: &_| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Request failure", - name = "Request failure", - body = %err, - error = true, - ); - }), - ) - .layer(cors); - - axum::Server::bind(&address) - .serve(router.into_make_service()) - .with_graceful_shutdown(async { - tokio::signal::ctrl_c() - .await - .expect("unable to install signal handler"); - }) - .await?; - - Ok(()) -} - -async fn get_empty() -> Json { - Json(C::make_empty_configuration()) -} - -async fn post_update( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, String)> { - let updated = C::update_configuration(configuration) - .await - .map_err(|err| match err { - UpdateConfigurationError::Other(err) => { - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) - } - })?; - Ok(Json(updated)) -} - -async fn get_config_schema() -> Json -where - C::RawConfiguration: JsonSchema, -{ - let schema = schemars::schema_for!(C::RawConfiguration); - Json(schema) -} - -#[derive(Debug, Clone, Serialize)] -struct ValidateResponse { - schema: SchemaResponse, - capabilities: CapabilitiesResponse, - resolved_configuration: String, -} - -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -enum ValidateErrors { - InvalidConfiguration { ranges: Vec }, - UnableToBuildSchema, - UnableToBuildCapabilities, - JsonEncodingError(String), -} - -async fn post_validate( - WithRejection(Json(configuration), _): WithRejection, JsonRejection>, -) -> Result, (StatusCode, Json)> -where - C::Configuration: Serialize, -{ - let configuration = - C::validate_raw_configuration(configuration) - .await - .map_err(|e| match e { - crate::connector::ValidateError::ValidateError(ranges) => ( - StatusCode::BAD_REQUEST, - Json(ValidateErrors::InvalidConfiguration { ranges }), - ), - })?; - let schema = C::get_schema(&configuration) - .await - .and_then(JsonResponse::into_value) - .map_err(|e| match e { - SchemaError::Other(err) => { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build schema", - name = "Unable to build schema", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildSchema), - ) - } - })?; - let capabilities = - C::get_capabilities() - .await - .into_value() - .map_err(|e: Box| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to build capabilities", - name = "Unable to build capabilities", - body = %e, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::UnableToBuildCapabilities), - ) - })?; - let resolved_config_bytes = serde_json::to_vec(&configuration).map_err(|err| { - tracing::error!( - meta.signal_type = "log", - event.domain = "ndc", - event.name = "Unable to serialize validated configuration", - name = "Unable to serialize validated configuration", - body = %err, - error = true, - ); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ValidateErrors::JsonEncodingError(err.to_string())), - ) - })?; - let resolved_configuration = general_purpose::STANDARD.encode(resolved_config_bytes); - Ok(Json(ValidateResponse { - schema, - capabilities, - resolved_configuration, - })) -} - struct ConnectorAdapter { configuration: C::Configuration, state: C::State, @@ -727,12 +511,7 @@ impl ndc_test::Connector for ConnectorAdapter { } } -async fn test( - command: TestCommand, -) -> Result<(), Box> -where - C::RawConfiguration: DeserializeOwned, -{ +async fn test(command: TestCommand) -> Result<(), Box> { let test_configuration = ndc_test::TestConfiguration { seed: command.seed, snapshots_dir: command.snapshots_dir, @@ -751,12 +530,7 @@ where Ok(()) } -async fn replay( - command: ReplayCommand, -) -> Result<(), Box> -where - C::RawConfiguration: DeserializeOwned, -{ +async fn replay(command: ReplayCommand) -> Result<(), Box> { let connector = make_connector_adapter::(command.configuration).await; let results = ndc_test::test_snapshots_in_directory(&connector, command.snapshots_dir).await; @@ -770,18 +544,8 @@ where Ok(()) } -async fn make_connector_adapter( - configuration_path: impl AsRef, -) -> ConnectorAdapter -where - C::RawConfiguration: DeserializeOwned, -{ - let configuration_json = std::fs::read_to_string(configuration_path).unwrap(); - let raw_configuration = - serde_json::de::from_str::(configuration_json.as_str()).unwrap(); - let configuration = C::validate_raw_configuration(raw_configuration) - .await - .unwrap(); +async fn make_connector_adapter(configuration_path: PathBuf) -> ConnectorAdapter { + let configuration = C::parse_configuration(configuration_path).await.unwrap(); let mut metrics = Registry::new(); let state = C::try_init_state(&configuration, &mut metrics)