Skip to content

Commit

Permalink
Add an InitializationContext.
Browse files Browse the repository at this point in the history
The initializion context is a value of a type specified by the
connector, which can be injected into the entrypoint. This allows users
of the connector functions to provide a little more control over how the
connector configures itself.
  • Loading branch information
SamirTalwar committed Feb 23, 2024
1 parent 09edbd5 commit b924b7d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 40 deletions.
49 changes: 23 additions & 26 deletions rust-connector-sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,54 +221,50 @@ pub enum MutationError {

/// Connectors using this library should implement this trait.
///
/// It provides methods which implement the standard endpoints defined by the specification:
/// capabilities, schema, query, mutation, query/explain, and mutation/explain.
///
/// It provides methods which implement the standard endpoints
/// defined by the specification: capabilities, schema, query, mutation,
/// query/explain, and mutation/explain.
///
/// In addition, it introduces names for types to manage
/// state and configuration (if any), and provides any necessary context
/// for observability purposes (metrics, logging and tracing).
/// In addition, it introduces names for types to manage state and configuration (if any), and
/// provides any necessary context for observability purposes (metrics, logging and tracing).
///
/// ## Configuration
///
/// Connectors encapsulate data sources, and likely require configuration
/// (connection strings, web service tokens, etc.). The NDC specification
/// does not discuss this sort of configuration, because it is an
/// implementation detail of a specific connector, but it is useful to
/// adopt a convention here for simplified configuration management.
/// Connectors encapsulate data sources, and likely require configuration (connection strings, web
/// service tokens, etc.). The NDC specification does not discuss this sort of configuration,
/// because it is an implementation detail of a specific connector, but it is useful to adopt a
/// convention here for simplified configuration management.
///
/// Configuration is specified as JSON, validated, and stored in a binary
/// format.
/// Configuration is input as a directory, which needs to be processed by the connector. The format
/// of the files in the directory is connector-specific.
///
/// This trait defines two types for managing configuration:
/// In addition, the caller can provide a [`Connector::InitializationContext`] value to help
/// prepare the configuration. This might, for example, provide connector-specific secrets.
///
/// - [`Connector::RawConfiguration`] defines the type of unvalidated, raw
/// configuration.
/// - [`Connector::Configuration`] defines the type of validated
/// configuration. Ideally, invalid configuration should not be representable
/// in this form.
/// Once parsed, the configuration should be represented by the [`Connector::Configuration`] type,
/// which is then accessible on request.
///
/// ## State
///
/// In addition to configuration, this trait defines a type for state management:
///
/// - [`Connector::State`] defines the type of any unserializable runtime state.
///
/// State is distinguished from configuration in that it is not provided directly by
/// the user, and would not ordinarily be serializable. For example, a connection string
/// would be configuration, but a connection pool object created from that
/// connection string would be state.
/// State is distinguished from configuration in that it is not provided directly by the user, and
/// is transient. For example, a connection string would be configuration, but a connection pool
/// object created from that connection string would be state.
#[async_trait]
pub trait Connector {
/// The type of validated configuration
/// Context used to initialize the server state
type InitializationContext: Sync + Send;
/// The connector configuration, parsed and validated
type Configuration: Sync + Send;
/// The type of unserializable state
/// The transient state of the connector
type State: Sync + Send;

/// Validate the raw configuration provided by the user,
/// returning a configuration error or a validated [`Connector::Configuration`].
async fn parse_configuration(
context: &Self::InitializationContext,
configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError>;

Expand All @@ -280,6 +276,7 @@ pub trait Connector {
/// In addition, this function should register any
/// connector-specific metrics with the metrics registry.
async fn try_init_state(
context: &Self::InitializationContext,
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError>;
Expand Down
3 changes: 3 additions & 0 deletions rust-connector-sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ pub struct Example {}

#[async_trait]
impl Connector for Example {
type InitializationContext = ();
type Configuration = ();
type State = ();

async fn parse_configuration(
_context: &Self::InitializationContext,
_configuration_dir: impl AsRef<Path> + Send,
) -> Result<Self::Configuration, ParseError> {
Ok(())
}

async fn try_init_state(
_context: &Self::InitializationContext,
_configuration: &Self::Configuration,
_metrics: &mut prometheus::Registry,
) -> Result<Self::State, InitializationError> {
Expand Down
59 changes: 45 additions & 14 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,42 @@ where
/// not described in the [NDC specification](http://hasura.github.io/ndc-spec/).
/// Specifically:
///
/// - It reads configuration as JSON from a file specified on the command line,
/// - It reads configuration from a directory specified on the command line,
/// - It reports traces to an OTLP collector specified on the command line,
/// - Logs are written to stdout
///
/// It provides the default initialization context to the connector configuration.
pub async fn default_main<C: Connector + 'static>() -> Result<(), Box<dyn Error + Send + Sync>>
where
C::InitializationContext: Default,
C::Configuration: Clone,
C::State: Clone,
{
default_main_with::<C>(C::InitializationContext::default()).await
}

/// A default main function for a connector that takes an initialization context.
///
/// This works just like [`default_main`], but allows you to use a non-default context.
pub async fn default_main_with<C: Connector + 'static>(
context: C::InitializationContext,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
C::Configuration: Clone,
C::State: Clone,
{
let CliArgs { command } = CliArgs::parse();

match command {
Command::Serve(serve_command) => serve::<C>(serve_command).await,
Command::Test(test_command) => test::<C>(test_command).await,
Command::Replay(replay_command) => replay::<C>(replay_command).await,
Command::Serve(serve_command) => serve::<C>(&context, serve_command).await,
Command::Test(test_command) => test::<C>(&context, test_command).await,
Command::Replay(replay_command) => replay::<C>(&context, replay_command).await,
Command::CheckHealth(check_health_command) => check_health(check_health_command).await,
}
}

async fn serve<C: Connector + 'static>(
context: &C::InitializationContext,
serve_command: ServeCommand,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
Expand All @@ -173,7 +190,7 @@ where
init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint)
.expect("Unable to initialize tracing");

let server_state = init_server_state::<C>(serve_command.configuration).await;
let server_state = init_server_state::<C>(context, serve_command.configuration).await;

let router = create_router::<C>(
server_state.clone(),
Expand Down Expand Up @@ -229,12 +246,15 @@ where

/// Initialize the server state from the configuration file.
pub async fn init_server_state<C: Connector>(
context: &C::InitializationContext,
config_directory: impl AsRef<Path> + Send,
) -> ServerState<C> {
let configuration = C::parse_configuration(config_directory).await.unwrap();
let configuration = C::parse_configuration(context, config_directory)
.await
.unwrap();

let mut metrics = Registry::new();
let state = C::try_init_state(&configuration, &mut metrics)
let state = C::try_init_state(context, &configuration, &mut metrics)
.await
.unwrap();

Expand Down Expand Up @@ -510,13 +530,16 @@ impl<C: Connector> ndc_test::Connector for ConnectorAdapter<C> {
}
}

async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn test<C: Connector>(
context: &C::InitializationContext,
command: TestCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let test_configuration = ndc_test::TestConfiguration {
seed: command.seed,
snapshots_dir: command.snapshots_dir,
};

let connector = make_connector_adapter::<C>(command.configuration).await;
let connector = make_connector_adapter::<C>(context, command.configuration).await;
let results = ndc_test::test_connector(&test_configuration, &connector).await;

if !results.failures.is_empty() {
Expand All @@ -529,8 +552,11 @@ async fn test<C: Connector>(command: TestCommand) -> Result<(), Box<dyn Error +
Ok(())
}

async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter::<C>(command.configuration).await;
async fn replay<C: Connector>(
context: &C::InitializationContext,
command: ReplayCommand,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let connector = make_connector_adapter::<C>(context, command.configuration).await;
let results = ndc_test::test_snapshots_in_directory(&connector, command.snapshots_dir).await;

if !results.failures.is_empty() {
Expand All @@ -543,11 +569,16 @@ async fn replay<C: Connector>(command: ReplayCommand) -> Result<(), Box<dyn Erro
Ok(())
}

async fn make_connector_adapter<C: Connector>(configuration_path: PathBuf) -> ConnectorAdapter<C> {
let configuration = C::parse_configuration(configuration_path).await.unwrap();
async fn make_connector_adapter<C: Connector>(
context: &C::InitializationContext,
configuration_path: PathBuf,
) -> ConnectorAdapter<C> {
let configuration = C::parse_configuration(context, configuration_path)
.await
.unwrap();

let mut metrics = Registry::new();
let state = C::try_init_state(&configuration, &mut metrics)
let state = C::try_init_state(context, &configuration, &mut metrics)
.await
.unwrap();

Expand Down

0 comments on commit b924b7d

Please sign in to comment.