Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out setup methods into their own trait. #111

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 50 additions & 46 deletions rust-connector-sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,69 +221,42 @@ pub enum MutationError {

/// Connectors using this library should implement this trait.
///
/// It provides methods which implement the standard endpoints defined by the
/// specification: capabilities, schema, query, mutation, query/explain,
/// and mutation/explain.
///
/// It provides methods which implement the standard endpoints
/// defined by the specification: capabilities, schema, query, mutation,
/// query/explain, and mutation/explain.
///
/// In addition, it introduces names for types to manage
/// state and configuration (if any), and provides any necessary context
/// for observability purposes (metrics, logging and tracing).
/// In addition, it introduces names for types to manage state and configuration
/// (if any), and provides any necessary context for observability purposes
/// (metrics, logging and tracing).
///
/// ## Configuration
///
/// Connectors encapsulate data sources, and likely require configuration
/// (connection strings, web service tokens, etc.). The NDC specification
/// does not discuss this sort of configuration, because it is an
/// implementation detail of a specific connector, but it is useful to
/// adopt a convention here for simplified configuration management.
///
/// Configuration is specified as JSON, validated, and stored in a binary
/// format.
/// (connection strings, web service tokens, etc.). The NDC specification does
/// not discuss this sort of configuration, because it is an implementation
/// detail of a specific connector, but it is useful to adopt a convention here
/// for simplified configuration management.
///
/// This trait defines two types for managing configuration:
///
/// - [`Connector::RawConfiguration`] defines the type of unvalidated, raw
/// configuration.
/// - [`Connector::Configuration`] defines the type of validated
/// configuration. Ideally, invalid configuration should not be representable
/// in this form.
/// Configuration is specified by the connector implementation. It is provided
/// as a path to a directory. Parsing this directory should result in a
/// [`Connector::Configuration`].
///
/// ## State
///
/// In addition to configuration, this trait defines a type for state management:
///
/// - [`Connector::State`] defines the type of any unserializable runtime state.
/// In addition to configuration, this trait defines a [`Connector::State`] type
/// for state management.
///
/// State is distinguished from configuration in that it is not provided directly by
/// the user, and would not ordinarily be serializable. For example, a connection string
/// would be configuration, but a connection pool object created from that
/// connection string would be state.
/// State is distinguished from configuration in that it is not provided
/// directly by the user, and would not ordinarily be serializable. For example,
/// a connection string would be configuration, but a connection pool object
/// created from that connection string would be state.
#[async_trait]
pub trait Connector {
/// The type of validated configuration
type Configuration: Sync + Send;
/// The type of unserializable state
type State: Sync + Send;

/// Validate the raw configuration provided by the user,
/// returning a configuration error or a validated [`Connector::Configuration`].
async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError>;

/// Initialize the connector's in-memory state.
///
/// For example, any connection pools, prepared queries,
/// or other managed resources would be allocated here.
///
/// In addition, this function should register any
/// connector-specific metrics with the metrics registry.
async fn try_init_state(
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError>;

/// Update any metrics from the state
///
/// Note: some metrics can be updated directly, and do not
Expand Down Expand Up @@ -359,3 +332,34 @@ pub trait Connector {
request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, QueryError>;
}

/// Connectors are set up by values that implement this trait.
///
/// It provides a method for parsing configuration, and another for initializing
/// state.
///
/// See [`Connector`] for further details.
#[async_trait]
pub trait ConnectorSetup {
type Connector: Connector;

/// Validate the configuration provided by the user, returning a
/// configuration error or a validated [`Connector::Configuration`].
async fn parse_configuration(
&self,
configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self::Connector as Connector>::Configuration, ParseError>;

/// Initialize the connector's in-memory state.
///
/// For example, any connection pools, prepared queries, or other managed
/// resources would be allocated here.
///
/// In addition, this function should register any connector-specific
/// metrics with the metrics registry.
async fn try_init_state(
&self,
configuration: &<Self::Connector as Connector>::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<<Self::Connector as Connector>::State, InitializationError>;
}
19 changes: 13 additions & 6 deletions rust-connector-sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@ use super::*;
pub struct Example {}

#[async_trait]
impl Connector for Example {
type Configuration = ();
type State = ();
impl ConnectorSetup for Example {
type Connector = Self;

async fn parse_configuration(
&self,
_configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError> {
) -> Result<<Self as Connector>::Configuration, ParseError> {
Ok(())
}

async fn try_init_state(
_configuration: &Self::Configuration,
&self,
_configuration: &<Self as Connector>::Configuration,
_metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError> {
) -> Result<<Self as Connector>::State, InitializationError> {
Ok(())
}
}

#[async_trait]
impl Connector for Example {
type Configuration = ();
type State = ();

fn fetch_metrics(
_configuration: &Self::Configuration,
Expand Down
86 changes: 54 additions & 32 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use ndc_client::models::{
use ndc_test::report;

use crate::check_health;
use crate::connector::Connector;
use crate::connector::{Connector, ConnectorSetup};
use crate::json_rejection::JsonRejection;
use crate::json_response::JsonResponse;
use crate::routes;
Expand Down Expand Up @@ -158,34 +158,52 @@ impl<C: Connector> ServerState<C> {
/// - It reads configuration as JSON from a file specified on the command line,
/// - It reports traces to an OTLP collector specified on the command line,
/// - Logs are written to stdout
pub async fn default_main<C: Connector + 'static>() -> Result<(), Box<dyn Error + Send + Sync>>
pub async fn default_main<Setup>() -> Result<(), Box<dyn Error + Send + Sync>>
where
C::Configuration: Clone,
C::State: Clone,
Setup: ConnectorSetup + Default,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
default_main_with(Setup::default()).await
}

/// A default main function for a connector, with a non-default setup.
///
/// See [`default_main`] for further details.
pub async fn default_main_with<Setup>(setup: Setup) -> Result<(), Box<dyn Error + Send + Sync>>
where
Setup: ConnectorSetup,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
let CliArgs { command } = CliArgs::parse();

match command {
Command::Serve(serve_command) => serve::<C>(serve_command).await,
Command::Test(test_command) => test::<C>(test_command).await,
Command::Replay(replay_command) => replay::<C>(replay_command).await,
Command::Serve(serve_command) => serve(setup, serve_command).await,
Command::Test(test_command) => test(setup, test_command).await,
Command::Replay(replay_command) => replay(setup, replay_command).await,
Command::CheckHealth(check_health_command) => check_health(check_health_command).await,
}
}

async fn serve<C: Connector + 'static>(
async fn serve<Setup>(
setup: Setup,
serve_command: ServeCommand,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
C::Configuration: Clone,
C::State: Clone,
Setup: ConnectorSetup,
Setup::Connector: Connector + 'static,
<Setup::Connector as Connector>::Configuration: Clone,
<Setup::Connector as Connector>::State: Clone,
{
init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint)
.expect("Unable to initialize tracing");

let server_state = init_server_state::<C>(serve_command.configuration).await?;
let server_state = init_server_state(setup, serve_command.configuration).await?;

let router = create_router::<C>(
let router = create_router::<Setup::Connector>(
server_state.clone(),
serve_command.service_token_secret.clone(),
);
Expand Down Expand Up @@ -238,20 +256,19 @@ where
}

/// Initialize the server state from the configuration file.
pub async fn init_server_state<C: Connector>(
pub async fn init_server_state<Setup: ConnectorSetup>(
setup: Setup,
config_directory: impl AsRef<Path> + Send,
) -> Result<ServerState<C>, Box<dyn Error + Send + Sync>> {
) -> Result<ServerState<Setup::Connector>, Box<dyn Error + Send + Sync>> {
let mut metrics = Registry::new();
let configuration = C::parse_configuration(config_directory).await?;
let state = C::try_init_state(&configuration, &mut metrics).await?;
let configuration = setup.parse_configuration(config_directory).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ServerState::new(configuration, state, metrics))
}

pub fn create_router<C: Connector + 'static>(
state: ServerState<C>,
service_token_secret: Option<String>,
) -> Router
pub fn create_router<C>(state: ServerState<C>, service_token_secret: Option<String>) -> Router
where
C: Connector + 'static,
C::Configuration: Clone,
C::State: Clone,
{
Expand Down Expand Up @@ -328,11 +345,9 @@ where
))
}

pub fn create_v2_router<C: Connector + 'static>(
state: ServerState<C>,
service_token_secret: Option<String>,
) -> Router
pub fn create_v2_router<C>(state: ServerState<C>, service_token_secret: Option<String>) -> Router
where
C: Connector + 'static,
C::Configuration: Clone,
C::State: Clone,
{
Expand Down Expand Up @@ -512,13 +527,16 @@ impl<C: Connector> ndc_test::Connector for ConnectorAdapter<C> {
}
}

async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn test<Setup: ConnectorSetup>(
setup: Setup,
command: TestCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let test_configuration = ndc_test::TestConfiguration {
seed: command.seed,
snapshots_dir: command.snapshots_dir,
};

let connector = make_connector_adapter::<C>(command.configuration).await?;
let connector = make_connector_adapter(setup, command.configuration).await?;
let results = ndc_test::test_connector(&test_configuration, &connector).await;

if !results.failures.is_empty() {
Expand All @@ -531,8 +549,11 @@ async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error +
Ok(())
}

async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter::<C>(command.configuration).await?;
async fn replay<Setup: ConnectorSetup>(
setup: Setup,
command: ReplayCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter(setup, command.configuration).await?;
let results = ndc_test::test_snapshots_in_directory(&connector, command.snapshots_dir).await;

if !results.failures.is_empty() {
Expand All @@ -545,12 +566,13 @@ async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Erro
Ok(())
}

async fn make_connector_adapter<C: Connector>(
async fn make_connector_adapter<Setup: ConnectorSetup>(
setup: Setup,
configuration_path: PathBuf,
) -> Result<ConnectorAdapter<C>, Box<dyn Error + Send + Sync>> {
) -> Result<ConnectorAdapter<Setup::Connector>, Box<dyn Error + Send + Sync>> {
let mut metrics = Registry::new();
let configuration = C::parse_configuration(configuration_path).await?;
let state = C::try_init_state(&configuration, &mut metrics).await?;
let configuration = setup.parse_configuration(configuration_path).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ConnectorAdapter {
configuration,
state,
Expand Down
Loading