Skip to content

Commit

Permalink
Split out setup methods into their own trait.
Browse files Browse the repository at this point in the history
These now receive access to `&self`, allowing us to inject further
information into configuration.

A `default_main_with` function lets us provide alternative setups.
  • Loading branch information
SamirTalwar committed Feb 26, 2024
1 parent 393213d commit 791a4f9
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 84 deletions.
96 changes: 50 additions & 46 deletions rust-connector-sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,69 +221,42 @@ pub enum MutationError {

/// Connectors using this library should implement this trait.
///
/// It provides methods which implement the standard endpoints defined by the
/// specification: capabilities, schema, query, mutation, query/explain,
/// and mutation/explain.
///
/// It provides methods which implement the standard endpoints
/// defined by the specification: capabilities, schema, query, mutation,
/// query/explain, and mutation/explain.
///
/// In addition, it introduces names for types to manage
/// state and configuration (if any), and provides any necessary context
/// for observability purposes (metrics, logging and tracing).
/// In addition, it introduces names for types to manage state and configuration
/// (if any), and provides any necessary context for observability purposes
/// (metrics, logging and tracing).
///
/// ## Configuration
///
/// Connectors encapsulate data sources, and likely require configuration
/// (connection strings, web service tokens, etc.). The NDC specification
/// does not discuss this sort of configuration, because it is an
/// implementation detail of a specific connector, but it is useful to
/// adopt a convention here for simplified configuration management.
///
/// Configuration is specified as JSON, validated, and stored in a binary
/// format.
/// (connection strings, web service tokens, etc.). The NDC specification does
/// not discuss this sort of configuration, because it is an implementation
/// detail of a specific connector, but it is useful to adopt a convention here
/// for simplified configuration management.
///
/// This trait defines two types for managing configuration:
///
/// - [`Connector::RawConfiguration`] defines the type of unvalidated, raw
/// configuration.
/// - [`Connector::Configuration`] defines the type of validated
/// configuration. Ideally, invalid configuration should not be representable
/// in this form.
/// Configuration is specified by the connector implementation. It is provided
/// as a path to a directory. Parsing this directory should result in a
/// [`Connector::Configuration`].
///
/// ## State
///
/// In addition to configuration, this trait defines a type for state management:
///
/// - [`Connector::State`] defines the type of any unserializable runtime state.
/// In addition to configuration, this trait defines a [`Connector::State`] type
/// for state management.
///
/// State is distinguished from configuration in that it is not provided directly by
/// the user, and would not ordinarily be serializable. For example, a connection string
/// would be configuration, but a connection pool object created from that
/// connection string would be state.
/// State is distinguished from configuration in that it is not provided
/// directly by the user, and would not ordinarily be serializable. For example,
/// a connection string would be configuration, but a connection pool object
/// created from that connection string would be state.
#[async_trait]
pub trait Connector {
/// The type of validated configuration
type Configuration: Sync + Send;
/// The type of unserializable state
type State: Sync + Send;

/// Validate the raw configuration provided by the user,
/// returning a configuration error or a validated [`Connector::Configuration`].
async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError>;

/// Initialize the connector's in-memory state.
///
/// For example, any connection pools, prepared queries,
/// or other managed resources would be allocated here.
///
/// In addition, this function should register any
/// connector-specific metrics with the metrics registry.
async fn try_init_state(
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError>;

/// Update any metrics from the state
///
/// Note: some metrics can be updated directly, and do not
Expand Down Expand Up @@ -359,3 +332,34 @@ pub trait Connector {
request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, QueryError>;
}

/// Connectors are set up by values that implement this trait.
///
/// It provides a method for parsing configuration, and another for initializing
/// state.
///
/// See [`Connector`] for further details.
#[async_trait]
pub trait ConnectorSetup {
type Connector: Connector;

/// Validate the configuration provided by the user, returning a
/// configuration error or a validated [`Connector::Configuration`].
async fn parse_configuration(
&self,
configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self::Connector as Connector>::Configuration, ParseError>;

/// Initialize the connector's in-memory state.
///
/// For example, any connection pools, prepared queries, or other managed
/// resources would be allocated here.
///
/// In addition, this function should register any connector-specific
/// metrics with the metrics registry.
async fn try_init_state(
&self,
configuration: &<Self::Connector as Connector>::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<<Self::Connector as Connector>::State, InitializationError>;
}
19 changes: 13 additions & 6 deletions rust-connector-sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@ use super::*;
pub struct Example {}

#[async_trait]
impl Connector for Example {
type Configuration = ();
type State = ();
impl ConnectorSetup for Example {
type Connector = Self;

async fn parse_configuration(
&self,
_configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError> {
) -> Result<<Self as Connector>::Configuration, ParseError> {
Ok(())
}

async fn try_init_state(
_configuration: &Self::Configuration,
&self,
_configuration: &<Self as Connector>::Configuration,
_metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError> {
) -> Result<<Self as Connector>::State, InitializationError> {
Ok(())
}
}

#[async_trait]
impl Connector for Example {
type Configuration = ();
type State = ();

fn fetch_metrics(
_configuration: &Self::Configuration,
Expand Down
86 changes: 54 additions & 32 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use ndc_client::models::{
use ndc_test::report;

use crate::check_health;
use crate::connector::Connector;
use crate::connector::{Connector, ConnectorSetup};
use crate::json_rejection::JsonRejection;
use crate::json_response::JsonResponse;
use crate::routes;
Expand Down Expand Up @@ -158,34 +158,52 @@ impl<C: Connector> ServerState<C> {
/// - It reads configuration as JSON from a file specified on the command line,
/// - It reports traces to an OTLP collector specified on the command line,
/// - Logs are written to stdout
pub async fn default_main<C: Connector + 'static>() -> Result<(), Box<dyn Error + Send + Sync>>
pub async fn default_main<Setup>() -> Result<(), Box<dyn Error + Send + Sync>>
where
C::Configuration: Clone,
C::State: Clone,
Setup: ConnectorSetup + Default,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
default_main_with(Setup::default()).await
}

/// A default main function for a connector, with a non-default setup.
///
/// See [`default_main`] for further details.
pub async fn default_main_with<Setup>(setup: Setup) -> Result<(), Box<dyn Error + Send + Sync>>
where
Setup: ConnectorSetup,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
let CliArgs { command } = CliArgs::parse();

match command {
Command::Serve(serve_command) => serve::<C>(serve_command).await,
Command::Test(test_command) => test::<C>(test_command).await,
Command::Replay(replay_command) => replay::<C>(replay_command).await,
Command::Serve(serve_command) => serve(setup, serve_command).await,
Command::Test(test_command) => test(setup, test_command).await,
Command::Replay(replay_command) => replay(setup, replay_command).await,
Command::CheckHealth(check_health_command) => check_health(check_health_command).await,
}
}

async fn serve<C: Connector + 'static>(
async fn serve<Setup>(
setup: Setup,
serve_command: ServeCommand,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
C::Configuration: Clone,
C::State: Clone,
Setup: ConnectorSetup,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint)
.expect("Unable to initialize tracing");

let server_state = init_server_state::<C>(serve_command.configuration).await?;
let server_state = init_server_state(setup, serve_command.configuration).await?;

let router = create_router::<C>(
let router = create_router::<Setup::Connector>(
server_state.clone(),
serve_command.service_token_secret.clone(),
);
Expand Down Expand Up @@ -238,20 +256,19 @@ where
}

/// Initialize the server state from the configuration file.
pub async fn init_server_state<C: Connector>(
pub async fn init_server_state<Setup: ConnectorSetup>(
setup: Setup,
config_directory: impl AsRef<Path> + Send,
) -> Result<ServerState<C>, Box<dyn Error + Send + Sync>> {
) -> Result<ServerState<Setup::Connector>, Box<dyn Error + Send + Sync>> {
let mut metrics = Registry::new();
let configuration = C::parse_configuration(config_directory).await?;
let state = C::try_init_state(&configuration, &mut metrics).await?;
let configuration = setup.parse_configuration(config_directory).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ServerState::new(configuration, state, metrics))
}

pub fn create_router<C: Connector + 'static>(
state: ServerState<C>,
service_token_secret: Option<String>,
) -> Router
pub fn create_router<C>(state: ServerState<C>, service_token_secret: Option<String>) -> Router
where
C: Connector + 'static,
C::Configuration: Clone,
C::State: Clone,
{
Expand Down Expand Up @@ -328,11 +345,9 @@ where
))
}

pub fn create_v2_router<C: Connector + 'static>(
state: ServerState<C>,
service_token_secret: Option<String>,
) -> Router
pub fn create_v2_router<C>(state: ServerState<C>, service_token_secret: Option<String>) -> Router
where
C: Connector + 'static,
C::Configuration: Clone,
C::State: Clone,
{
Expand Down Expand Up @@ -512,13 +527,16 @@ impl<C: Connector> ndc_test::Connector for ConnectorAdapter<C> {
}
}

async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn test<Setup: ConnectorSetup>(
setup: Setup,
command: TestCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let test_configuration = ndc_test::TestConfiguration {
seed: command.seed,
snapshots_dir: command.snapshots_dir,
};

let connector = make_connector_adapter::<C>(command.configuration).await?;
let connector = make_connector_adapter(setup, command.configuration).await?;
let results = ndc_test::test_connector(&test_configuration, &connector).await;

if !results.failures.is_empty() {
Expand All @@ -531,8 +549,11 @@ async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error +
Ok(())
}

async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter::<C>(command.configuration).await?;
async fn replay<Setup: ConnectorSetup>(
setup: Setup,
command: ReplayCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter(setup, command.configuration).await?;
let results = ndc_test::test_snapshots_in_directory(&connector, command.snapshots_dir).await;

if !results.failures.is_empty() {
Expand All @@ -545,12 +566,13 @@ async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Erro
Ok(())
}

async fn make_connector_adapter<C: Connector>(
async fn make_connector_adapter<Setup: ConnectorSetup>(
setup: Setup,
configuration_path: PathBuf,
) -> Result<ConnectorAdapter<C>, Box<dyn Error + Send + Sync>> {
) -> Result<ConnectorAdapter<Setup::Connector>, Box<dyn Error + Send + Sync>> {
let mut metrics = Registry::new();
let configuration = C::parse_configuration(configuration_path).await?;
let state = C::try_init_state(&configuration, &mut metrics).await?;
let configuration = setup.parse_configuration(configuration_path).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ConnectorAdapter {
configuration,
state,
Expand Down

0 comments on commit 791a4f9

Please sign in to comment.