diff --git a/.github/workflows/deploy-stage.yaml b/.github/workflows/deploy-stage.yaml index fddf728..0fee01f 100644 --- a/.github/workflows/deploy-stage.yaml +++ b/.github/workflows/deploy-stage.yaml @@ -144,6 +144,10 @@ jobs: runs-on: ubuntu-latest if: ${{ startsWith(github.ref, 'refs/tags/v') }} steps: + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + rustflags: "" # defaults to "-D warnings", set to empty string to allow warnings + - uses: actions/checkout@v4 - uses: actions/download-artifact@v4 @@ -183,6 +187,9 @@ jobs: mkdir -p "${ROOT}/release/connector-definition/.hasura-connector/" cat "${ROOT}/ci/templates/connector-metadata.yaml" | envsubst > "${ROOT}/release/connector-definition/.hasura-connector/connector-metadata.yaml" + + cargo run --package ndc-clickhouse-cli -- --connector-context-path "${ROOT}/release/connector-definition" init + tar -czvf "${ROOT}/release/artifacts/connector-definition.tgz" --directory "${ROOT}/release/connector-definition/" . - uses: actions/upload-artifact@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ab5bae..cd3ad98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.10] + +- Correct CLI implementation of Init command (used to behave the same as update) +- Update sdk & errors +- Fix version returned by capabilities +- Fix parameter escaping, enabling complex data types as parameters +- Remove deprecated JSON data type, see [clickhouse docs](https://clickhouse.com/docs/en/sql-reference/data-types/object-data-type) + ## [0.2.9] - Change namespaceing to use `.` separator instead of `_`. We assume table names are less likely to contain periods, so this reduces likelyhood of naming conflicts.This will change generated type names and will thus manifest as a breaking change for some users diff --git a/Cargo.lock b/Cargo.lock index 29bc87b..aa7d7b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -334,7 +334,7 @@ checksum = "97af0562545a7d7f3d9222fcf909963bec36dcb502afaacab98c6ffac8da47ce" [[package]] name = "common" -version = "0.2.9" +version = "0.2.10" dependencies = [ "bytes", "peg", @@ -1059,7 +1059,7 @@ dependencies = [ [[package]] name = "ndc-clickhouse" -version = "0.2.9" +version = "0.2.10" dependencies = [ "async-trait", "bytes", @@ -1079,7 +1079,7 @@ dependencies = [ [[package]] name = "ndc-clickhouse-cli" -version = "0.2.9" +version = "0.2.10" dependencies = [ "clap", "common", @@ -1103,8 +1103,8 @@ dependencies = [ [[package]] name = "ndc-sdk" -version = "0.1.4" -source = "git+https://github.com/hasura/ndc-sdk-rs?tag=v0.1.4#29adcb5983c1237e8a5f4732d5230c2ba8ab75d3" +version = "0.1.5" +source = "git+https://github.com/hasura/ndc-sdk-rs?tag=v0.1.5#7f8382001b745c24b5f066411dde6822df65f545" dependencies = [ "async-trait", "axum", diff --git a/Cargo.toml b/Cargo.toml index d867865..d55eae1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,5 @@ members = [ ] resolver = "2" -package.version = "0.2.9" +package.version = "0.2.10" package.edition = "2021" diff --git a/crates/common/src/clickhouse_parser.rs b/crates/common/src/clickhouse_parser.rs index fe69a60..b939c6d 100644 --- a/crates/common/src/clickhouse_parser.rs +++ b/crates/common/src/clickhouse_parser.rs @@ -46,7 +46,6 @@ peg::parser! { / date_time() / date32() / date() - / json() / uuid() / ipv4() / ipv6() @@ -86,7 +85,6 @@ peg::parser! { rule date32() -> DT = "Date32" { DT::Date32 } rule date_time() -> DT = "DateTime" tz:("(" tz:single_quoted_string_value()? ")" { tz })? { DT::DateTime { timezone: tz.flatten().map(|s| s.to_owned()) } } rule date_time64() -> DT = "DateTime64(" precision:integer_value() tz:(comma_separator() tz:single_quoted_string_value()? { tz })? ")" { DT::DateTime64{ precision, timezone: tz.flatten().map(|s| s.to_owned())} } - rule json() -> DT = "JSON" { DT::Json } rule uuid() -> DT = "UUID" { DT::Uuid } rule ipv4() -> DT = "IPv4" { DT::IPv4 } rule ipv6() -> DT = "IPv6" { DT::IPv6 } diff --git a/crates/common/src/clickhouse_parser/datatype.rs b/crates/common/src/clickhouse_parser/datatype.rs index 63c4924..d5a2815 100644 --- a/crates/common/src/clickhouse_parser/datatype.rs +++ b/crates/common/src/clickhouse_parser/datatype.rs @@ -87,7 +87,7 @@ impl Display for AggregateFunctionParameter { /// A parsed representation of a clickhouse datatype string /// This should support the full scope of clickhouse types -/// To create one from a string slice, use try_from() +/// To create one from a string slice, use from_str() #[derive(Debug, Clone, PartialEq)] pub enum ClickHouseDataType { Nullable(Box), @@ -133,7 +133,6 @@ pub enum ClickHouseDataType { precision: u32, timezone: Option, }, - Json, Uuid, IPv4, IPv6, @@ -203,7 +202,6 @@ impl Display for ClickHouseDataType { } write!(f, ")") } - DT::Json => write!(f, "JSON"), DT::Uuid => write!(f, "UUID"), DT::IPv4 => write!(f, "IPv4"), DT::IPv6 => write!(f, "IPv6"), diff --git a/crates/common/src/config_file.rs b/crates/common/src/config_file.rs index a23d5f4..d79e600 100644 --- a/crates/common/src/config_file.rs +++ b/crates/common/src/config_file.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] /// the main configuration file pub struct ServerConfigFile { #[serde(rename = "$schema")] @@ -22,6 +22,16 @@ pub struct ServerConfigFile { pub queries: BTreeMap, } +impl Default for ServerConfigFile { + fn default() -> Self { + Self { + schema: CONFIG_SCHEMA_FILE_NAME.to_string(), + tables: Default::default(), + queries: Default::default(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] pub struct TableConfigFile { /// The table name diff --git a/crates/ndc-clickhouse-cli/src/database_introspection.rs b/crates/ndc-clickhouse-cli/src/database_introspection.rs index e555507..04fc67b 100644 --- a/crates/ndc-clickhouse-cli/src/database_introspection.rs +++ b/crates/ndc-clickhouse-cli/src/database_introspection.rs @@ -11,8 +11,10 @@ use common::{ pub struct TableInfo { pub table_name: String, pub table_schema: String, + #[allow(dead_code)] pub table_catalog: String, pub table_comment: Option, + #[allow(dead_code)] pub table_type: TableType, pub primary_key: Option, pub view_definition: String, @@ -23,6 +25,7 @@ pub struct TableInfo { pub struct ColumnInfo { pub column_name: String, pub data_type: String, + #[allow(dead_code)] pub is_nullable: bool, pub is_in_primary_key: bool, } diff --git a/crates/ndc-clickhouse-cli/src/main.rs b/crates/ndc-clickhouse-cli/src/main.rs index 157daac..e01dfec 100644 --- a/crates/ndc-clickhouse-cli/src/main.rs +++ b/crates/ndc-clickhouse-cli/src/main.rs @@ -60,16 +60,6 @@ struct CliArgs { env = "HASURA_PLUGIN_CONNECTOR_CONTEXT_PATH" )] context_path: Option, - #[arg(long = "clickhouse-url", value_name = "URL", env = "CLICKHOUSE_URL")] - clickhouse_url: String, - #[arg(long = "clickhouse-username", value_name = "USERNAME", env = "CLICKHOUSE_USERNAME", default_value_t = String::from("default"))] - clickhouse_username: String, - #[arg( - long = "clickhouse-password", - value_name = "PASSWORD", - env = "CLICKHOUSE_PASSWORD" - )] - clickhouse_password: String, #[command(subcommand)] command: Command, } @@ -77,7 +67,18 @@ struct CliArgs { #[derive(Clone, Subcommand)] enum Command { Init {}, - Update {}, + Update { + #[arg(long = "clickhouse-url", value_name = "URL", env = "CLICKHOUSE_URL")] + url: String, + #[arg(long = "clickhouse-username", value_name = "USERNAME", env = "CLICKHOUSE_USERNAME", default_value_t = String::from("default"))] + username: String, + #[arg( + long = "clickhouse-password", + value_name = "PASSWORD", + env = "CLICKHOUSE_PASSWORD" + )] + password: String, + }, Validate {}, Watch {}, } @@ -93,11 +94,6 @@ enum LogLevel { Trace, } -struct Context { - context_path: PathBuf, - connection: ConnectionConfig, -} - #[tokio::main] async fn main() -> Result<(), Box> { let args = CliArgs::parse(); @@ -107,33 +103,41 @@ async fn main() -> Result<(), Box> { Some(path) => path, }; - let connection = ConnectionConfig { - url: args.clickhouse_url, - username: args.clickhouse_username, - password: args.clickhouse_password, - }; - - let context = Context { - context_path, - connection, - }; - match args.command { Command::Init {} => { - let introspection = introspect_database(&context.connection).await?; - let config = update_tables_config(&context.context_path, &introspection).await?; - validate_table_config(&context.context_path, &config).await?; + let config = ServerConfigFile::default(); + let config_schema = schema_for!(ServerConfigFile); + + let file_path = context_path.join(CONFIG_FILE_NAME); + let schema_file_path = context_path.join(CONFIG_SCHEMA_FILE_NAME); + + fs::write(&file_path, serde_json::to_string_pretty(&config)?).await?; + fs::write( + &schema_file_path, + serde_json::to_string_pretty(&config_schema)?, + ) + .await?; } - Command::Update {} => { - let introspection = introspect_database(&context.connection).await?; - let config = update_tables_config(&context.context_path, &introspection).await?; - validate_table_config(&context.context_path, &config).await?; + Command::Update { + url, + username, + password, + } => { + let connection = ConnectionConfig { + url, + username, + password, + }; + + let introspection = introspect_database(&connection).await?; + let config = update_tables_config(&context_path, &introspection).await?; + validate_table_config(&context_path, &config).await?; } Command::Validate {} => { - let file_path = context.context_path.join(CONFIG_FILE_NAME); + let file_path = context_path.join(CONFIG_FILE_NAME); let config = read_config_file(&file_path).await?; if let Some(config) = config { - validate_table_config(&context.context_path, &config).await?; + validate_table_config(&context_path, &config).await?; } } Command::Watch {} => { diff --git a/crates/ndc-clickhouse/Cargo.toml b/crates/ndc-clickhouse/Cargo.toml index 95d0c8c..f348eed 100644 --- a/crates/ndc-clickhouse/Cargo.toml +++ b/crates/ndc-clickhouse/Cargo.toml @@ -8,7 +8,7 @@ async-trait = "0.1.78" bytes = "1.6.0" common = { path = "../common" } indexmap = "2.1.0" -ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs", tag = "v0.1.4", package = "ndc-sdk" } +ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs", tag = "v0.1.5", package = "ndc-sdk" } prometheus = "0.13.3" reqwest = { version = "0.12.3", features = [ "json", diff --git a/crates/ndc-clickhouse/src/connector.rs b/crates/ndc-clickhouse/src/connector.rs index 81ec2fd..aab40dc 100644 --- a/crates/ndc-clickhouse/src/connector.rs +++ b/crates/ndc-clickhouse/src/connector.rs @@ -1,53 +1,22 @@ pub mod handler; +pub mod setup; pub mod state; -use std::{collections::BTreeMap, env, path::Path, str::FromStr}; -use tokio::fs; - +use self::state::ServerState; use async_trait::async_trait; +use common::config::ServerConfig; use ndc_sdk::{ connector::{ - Connector, ConnectorSetup, ExplainError, FetchMetricsError, HealthError, - InitializationError, InvalidNode, InvalidNodes, KeyOrIndex, LocatedError, MutationError, - ParseError, QueryError, SchemaError, + Connector, ExplainError, FetchMetricsError, HealthError, MutationError, QueryError, + SchemaError, }, json_response::JsonResponse, models, }; -use self::state::ServerState; -use common::{ - clickhouse_parser::{datatype::ClickHouseDataType, parameterized_query::ParameterizedQuery}, - config::{ConnectionConfig, ParameterizedQueryConfig, ServerConfig, TableConfig, TableType}, - config_file::{ - ParameterizedQueryConfigFile, ReturnType, ServerConfigFile, TableConfigFile, - CONFIG_FILE_NAME, - }, -}; - #[derive(Debug, Clone, Default)] pub struct ClickhouseConnector; -#[async_trait] -impl ConnectorSetup for ClickhouseConnector { - type Connector = Self; - - async fn parse_configuration( - &self, - configuration_dir: impl AsRef + Send, - ) -> Result<::Configuration, ParseError> { - read_server_config(configuration_dir).await - } - - async fn try_init_state( - &self, - configuration: &::Configuration, - _metrics: &mut prometheus::Registry, - ) -> Result<::State, InitializationError> { - Ok(ServerState::new(configuration)) - } -} - #[async_trait] impl Connector for ClickhouseConnector { type Configuration = ServerConfig; @@ -67,11 +36,11 @@ impl Connector for ClickhouseConnector { let client = state .client(configuration) .await - .map_err(|err| HealthError::Other(err.to_string().into()))?; + .map_err(HealthError::new)?; common::client::ping(&client, &configuration.connection) .await - .map_err(|err| HealthError::Other(err.to_string().into()))?; + .map_err(HealthError::new)?; Ok(()) } @@ -99,8 +68,8 @@ impl Connector for ClickhouseConnector { _state: &Self::State, _request: models::MutationRequest, ) -> Result, ExplainError> { - Err(ExplainError::UnsupportedOperation( - "mutation explain not supported".to_string(), + Err(ExplainError::new_unsupported_operation( + &"mutation explain not supported", )) } @@ -109,8 +78,8 @@ impl Connector for ClickhouseConnector { _state: &Self::State, _request: models::MutationRequest, ) -> Result, MutationError> { - Err(MutationError::UnsupportedOperation( - "mutation not supported".to_string(), + Err(MutationError::new_unsupported_operation( + &"mutation not supported", )) } @@ -122,291 +91,3 @@ impl Connector for ClickhouseConnector { handler::query(configuration, state, request).await } } - -/// read server configuration from env var -pub async fn read_server_config( - configuration_dir: impl AsRef + Send, -) -> Result { - let connection = get_connection_config()?; - - let file_path = configuration_dir.as_ref().join(CONFIG_FILE_NAME); - - let config_file = fs::read_to_string(&file_path) - .await - .map_err(|err| match err.kind() { - std::io::ErrorKind::NotFound => { - ParseError::CouldNotFindConfiguration(file_path.to_owned()) - } - _ => ParseError::IoError(err), - })?; - - let config = serde_json::from_str::(&config_file).map_err(|err| { - ParseError::ParseError(LocatedError { - file_path: file_path.to_owned(), - line: err.line(), - column: err.column(), - message: err.to_string(), - }) - })?; - - let table_types = config - .tables - .iter() - .map(|(table_alias, table_config)| { - let table_type = validate_and_parse_return_type( - &table_config.return_type, - &config, - &file_path, - &["tables", &table_alias, "return_type"], - )? - .map(|columns| { - ( - table_alias.to_owned(), - TableType { - comment: table_config.comment.to_owned(), - columns, - }, - ) - }); - - Ok(table_type) - }) - .chain(config.queries.iter().map(|(query_alias, query_config)| { - let table_type = validate_and_parse_return_type( - &query_config.return_type, - &config, - &file_path, - &["query", &query_alias, "return_type"], - )? - .map(|columns| { - ( - query_alias.to_owned(), - TableType { - comment: query_config.comment.to_owned(), - columns, - }, - ) - }); - - Ok(table_type) - })) - .filter_map(|table_type| table_type.transpose()) - .collect::>()?; - - let tables = config - .tables - .iter() - .map(|(table_alias, table_config)| { - Ok(( - table_alias.clone(), - TableConfig { - name: table_config.name.to_owned(), - schema: table_config.schema.to_owned(), - comment: table_config.comment.to_owned(), - primary_key: table_config.primary_key.to_owned(), - return_type: match &table_config.return_type { - ReturnType::Definition { .. } => table_alias.to_owned(), - ReturnType::TableReference { - table_name: target_alias, - } - | ReturnType::QueryReference { - query_name: target_alias, - } => target_alias.to_owned(), - }, - arguments: table_config - .arguments - .iter() - .map(|(name, r#type)| { - let data_type = - ClickHouseDataType::from_str(r#type).map_err(|_err| { - ParseError::ValidateError(InvalidNodes(vec![InvalidNode { - file_path: file_path.to_owned(), - node_path: vec![ - KeyOrIndex::Key("tables".to_string()), - KeyOrIndex::Key(table_alias.to_owned()), - KeyOrIndex::Key("arguments".to_string()), - KeyOrIndex::Key(name.to_owned()), - ], - message: "Unable to parse data type".to_string(), - }])) - })?; - - Ok((name.to_owned(), data_type)) - }) - .collect::>()?, - }, - )) - }) - .collect::, ParseError>>()?; - - let mut queries = BTreeMap::new(); - - for (query_alias, query_config) in config.queries.clone() { - let query_file_path = configuration_dir.as_ref().join(&query_config.file); - let file_content = fs::read_to_string(&query_file_path).await.map_err(|err| { - if let std::io::ErrorKind::NotFound = err.kind() { - ParseError::CouldNotFindConfiguration(query_file_path.to_owned()) - } else { - ParseError::IoError(err) - } - })?; - - let query = ParameterizedQuery::from_str(&file_content).map_err(|err| { - ParseError::ValidateError(InvalidNodes(vec![InvalidNode { - file_path: query_file_path.clone(), - node_path: vec![ - KeyOrIndex::Key("queries".to_string()), - KeyOrIndex::Key(query_alias.clone()), - ], - message: format!("Unable to parse parameterized query: {}", err), - }])) - })?; - - let query_definition = ParameterizedQueryConfig { - exposed_as: query_config.exposed_as.to_owned(), - comment: query_config.comment.to_owned(), - query, - return_type: match query_config.return_type { - ReturnType::Definition { .. } => query_alias.to_owned(), - ReturnType::TableReference { - table_name: target_alias, - } - | ReturnType::QueryReference { - query_name: target_alias, - } => target_alias.to_owned(), - }, - }; - - queries.insert(query_alias.to_owned(), query_definition); - } - - let config = ServerConfig { - connection, - // hardcoding separator for now, to avoid prematurely exposing configuration options we may not want to keep - // if we make this configurable, we must default to this separator when the option is not provided - namespace_separator: ".".to_string(), - table_types, - tables, - queries, - }; - - Ok(config) -} - -fn get_connection_config() -> Result { - // define what the new configuration will look like - // assemble config from env vars and reading files in config directory - let url = env::var("CLICKHOUSE_URL") - .map_err(|_err| ParseError::Other("CLICKHOUSE_URL env var must be set".into()))?; - let username = env::var("CLICKHOUSE_USERNAME") - .map_err(|_err| ParseError::Other("CLICKHOUSE_USERNAME env var must be set".into()))?; - let password = env::var("CLICKHOUSE_PASSWORD") - .map_err(|_err| ParseError::Other("CLICKHOUSE_PASSWORD env var must be set".into()))?; - - Ok(ConnectionConfig { - url, - username, - password, - }) -} - -fn validate_and_parse_return_type( - return_type: &ReturnType, - config: &ServerConfigFile, - file_path: &Path, - node_path: &[&str], -) -> Result>, ParseError> { - let get_node_path = |extra_segments: &[&str]| { - node_path - .iter() - .chain(extra_segments.iter()) - .map(|s| KeyOrIndex::Key(s.to_string())) - .collect() - }; - match return_type { - ReturnType::TableReference { table_name } => { - match config.tables.get(table_name) { - Some(TableConfigFile { - return_type: ReturnType::Definition { .. }, - .. - }) => Ok(None), - Some(_) => { - Err(ParseError::ValidateError(InvalidNodes(vec![ - InvalidNode { - file_path: file_path.to_path_buf(), - node_path: get_node_path(&["table_name"]), - message: format!( - "Invalid reference: referenced table {} which does not have a return type definition", - table_name, - ), - }, - ]))) - } - None => { - Err(ParseError::ValidateError(InvalidNodes(vec![ - InvalidNode { - file_path: file_path.to_path_buf(), - node_path: get_node_path(&["table_name"]), - message: format!( - "Orphan reference: cannot find referenced table {}", - table_name, - ), - }, - ]))) - } - } - } - ReturnType::QueryReference { query_name } => { - match config.queries.get(query_name) { - Some(ParameterizedQueryConfigFile { - return_type: ReturnType::Definition { .. }, - .. - }) => Ok(None), - Some(_) => { - Err(ParseError::ValidateError(InvalidNodes(vec![ - InvalidNode { - file_path: file_path.to_path_buf(), - node_path: get_node_path(&["query_name"]), - message: format!( - "Invalid reference: referenced query {} which does not have a return type definition", - query_name, - ), - }, - ]))) - } - None => { - Err(ParseError::ValidateError(InvalidNodes(vec![ - InvalidNode { - file_path: file_path.to_path_buf(), - node_path: get_node_path(&["query_name"]), - message: format!( - "Orphan reference: cannot find referenced query {}", - query_name, - ), - }, - ]))) - } - } - } - ReturnType::Definition { columns } => Ok(Some( - - columns - .iter() - .map(|(field_alias, field_type)| { - let data_type = ClickHouseDataType::from_str(field_type).map_err(|err| { - ParseError::ValidateError(InvalidNodes(vec![InvalidNode { - file_path: file_path.to_path_buf(), - node_path: get_node_path(&["columns", field_alias]), - message: format!( - "Unable to parse data type \"{}\": {}", - field_type, err - ), - }])) - })?; - Ok((field_alias.to_owned(), data_type)) - }) - .collect::, ParseError>>()? - - )) - } -} diff --git a/crates/ndc-clickhouse/src/connector/handler/capabilities.rs b/crates/ndc-clickhouse/src/connector/handler/capabilities.rs index 72b524f..107ec4d 100644 --- a/crates/ndc-clickhouse/src/connector/handler/capabilities.rs +++ b/crates/ndc-clickhouse/src/connector/handler/capabilities.rs @@ -1,10 +1,13 @@ -use ndc_sdk::models::{self, LeafCapability, NestedFieldCapabilities, RelationshipCapabilities}; +use ndc_sdk::models::{ + Capabilities, CapabilitiesResponse, LeafCapability, MutationCapabilities, + NestedFieldCapabilities, QueryCapabilities, RelationshipCapabilities, +}; -pub fn capabilities() -> models::CapabilitiesResponse { - models::CapabilitiesResponse { +pub fn capabilities() -> CapabilitiesResponse { + CapabilitiesResponse { version: "0.1.4".to_owned(), - capabilities: models::Capabilities { - query: models::QueryCapabilities { + capabilities: Capabilities { + query: QueryCapabilities { aggregates: Some(LeafCapability {}), variables: Some(LeafCapability {}), explain: Some(LeafCapability {}), @@ -14,7 +17,7 @@ pub fn capabilities() -> models::CapabilitiesResponse { aggregates: None, }, }, - mutation: models::MutationCapabilities { + mutation: MutationCapabilities { transactional: None, explain: None, }, diff --git a/crates/ndc-clickhouse/src/connector/handler/explain.rs b/crates/ndc-clickhouse/src/connector/handler/explain.rs index 5e72ba9..665c812 100644 --- a/crates/ndc-clickhouse/src/connector/handler/explain.rs +++ b/crates/ndc-clickhouse/src/connector/handler/explain.rs @@ -16,23 +16,20 @@ pub async fn explain( state: &ServerState, request: models::QueryRequest, ) -> Result, ExplainError> { - let unsafe_statement = QueryBuilder::new(&request, configuration).build()?; + let inlined_statement = QueryBuilder::new(&request, configuration) + .build_inlined()? + .explain() + .to_string(); + let (parameterized_statement, parameters) = + QueryBuilder::new(&request, configuration).build_parameterized()?; + let parameterized_statement = parameterized_statement.explain().to_string(); - let unsafe_statement = unsafe_statement.explain(); - - let (statement, parameters) = unsafe_statement.clone().into_parameterized_statement(); - - let statement_string = statement.to_parameterized_sql_string(); - - let client = state - .client(configuration) - .await - .map_err(|err| ExplainError::Other(err.to_string().into()))?; + let client = state.client(configuration).await?; let explain = execute_json_query::( &client, &configuration.connection, - &statement_string, + ¶meterized_statement, ¶meters, ) .await @@ -47,18 +44,15 @@ pub async fn explain( let details = BTreeMap::from_iter(vec![ ( "SQL Query".to_string(), - add_variables_note( - &request, - &pretty_print_sql(&unsafe_statement.to_unsafe_sql_string()), - ), + add_variables_note(&request, &pretty_print_sql(&inlined_statement)), ), ( "Parameterized SQL Query".to_string(), - add_variables_note(&request, &pretty_print_sql(&statement_string)), + add_variables_note(&request, &pretty_print_sql(¶meterized_statement)), ), ( "Parameters".to_string(), - serde_json::to_string(¶meters).map_err(|err| ExplainError::Other(Box::new(err)))?, + serde_json::to_string(¶meters).map_err(ExplainError::new)?, ), ("Execution Plan".to_string(), explain), ]); diff --git a/crates/ndc-clickhouse/src/connector/handler/query.rs b/crates/ndc-clickhouse/src/connector/handler/query.rs index e82461b..896e9d8 100644 --- a/crates/ndc-clickhouse/src/connector/handler/query.rs +++ b/crates/ndc-clickhouse/src/connector/handler/query.rs @@ -12,8 +12,7 @@ pub async fn query( #[cfg(debug_assertions)] { // this block only present in debug builds, to avoid leaking sensitive information - let request_string = serde_json::to_string(&request) - .map_err(|err| QueryError::Other(err.to_string().into()))?; + let request_string = serde_json::to_string(&request).map_err(QueryError::new)?; tracing::event!(Level::DEBUG, "Incoming IR" = request_string); } @@ -21,28 +20,26 @@ pub async fn query( let (statement_string, parameters) = tracing::info_span!("Build SQL Query", internal.visibility = "user").in_scope( || -> Result<_, QueryError> { - let statement = QueryBuilder::new(&request, configuration).build()?; + let (statement, parameters) = + QueryBuilder::new(&request, configuration).build_parameterized()?; #[cfg(debug_assertions)] { // this block only present in debug builds, to avoid leaking sensitive information - let unsafe_statement_string = statement.to_unsafe_sql_string(); + let unsafe_statement_string = QueryBuilder::new(&request, configuration) + .build_inlined()? + .to_string(); tracing::event!(Level::DEBUG, "Generated SQL" = unsafe_statement_string); } - let (statement, parameters) = statement.into_parameterized_statement(); - - let statement_string = statement.to_parameterized_sql_string(); + let statement_string = statement.to_string(); Ok((statement_string, parameters)) }, )?; - let client = state - .client(configuration) - .await - .map_err(|err| QueryError::Other(err.to_string().into()))?; + let client = state.client(configuration).await.map_err(QueryError::new)?; let execution_span = tracing::info_span!( "Execute SQL query", @@ -60,13 +57,12 @@ pub async fn query( ) .instrument(execution_span) .await - .map_err(|err| QueryError::UnprocessableContent(err.to_string()))?; + .map_err(QueryError::new)?; #[cfg(debug_assertions)] { // this block only present in debug builds, to avoid leaking sensitive information - let result_string = - std::str::from_utf8(&rowsets).map_err(|err| QueryError::Other(err.into()))?; + let result_string = std::str::from_utf8(&rowsets).map_err(QueryError::new)?; tracing::event!(Level::DEBUG, "Response" = result_string); } diff --git a/crates/ndc-clickhouse/src/connector/setup.rs b/crates/ndc-clickhouse/src/connector/setup.rs new file mode 100644 index 0000000..845af7d --- /dev/null +++ b/crates/ndc-clickhouse/src/connector/setup.rs @@ -0,0 +1,357 @@ +use async_trait::async_trait; +use common::{ + clickhouse_parser::{datatype::ClickHouseDataType, parameterized_query::ParameterizedQuery}, + config::{ConnectionConfig, ParameterizedQueryConfig, ServerConfig, TableConfig, TableType}, + config_file::{ + ParameterizedQueryConfigFile, ReturnType, ServerConfigFile, TableConfigFile, + CONFIG_FILE_NAME, + }, +}; +use ndc_sdk::connector::{ + Connector, ConnectorSetup, InitializationError, InvalidNode, InvalidNodes, KeyOrIndex, + LocatedError, ParseError, +}; +use std::{ + collections::{BTreeMap, HashMap}, + env, + path::Path, + str::FromStr, +}; +use tokio::fs; + +use super::{state::ServerState, ClickhouseConnector}; +#[derive(Debug, Clone)] +pub struct ClickhouseConnectorSetup { + url: Option, + username: Option, + password: Option, +} + +#[async_trait] +impl ConnectorSetup for ClickhouseConnectorSetup { + type Connector = ClickhouseConnector; + + async fn parse_configuration( + &self, + configuration_dir: impl AsRef + Send, + ) -> Result<::Configuration, ParseError> { + self.read_server_config(configuration_dir).await + } + + async fn try_init_state( + &self, + configuration: &::Configuration, + _metrics: &mut prometheus::Registry, + ) -> Result<::State, InitializationError> { + Ok(ServerState::new(configuration)) + } +} + +impl Default for ClickhouseConnectorSetup { + fn default() -> Self { + Self { + url: env::var("CLICKHOUSE_URL").ok(), + username: env::var("CLICKHOUSE_USERNAME").ok(), + password: env::var("CLICKHOUSE_PASSWORD").ok(), + } + } +} + +impl ClickhouseConnectorSetup { + pub fn new_from_env(env: HashMap) -> Self { + Self { + url: env.get("CLICKHOUSE_URL").cloned(), + username: env.get("CLICKHOUSE_USERNAME").cloned(), + password: env.get("CLICKHOUSE_PASSWORD").cloned(), + } + } + pub async fn read_server_config( + &self, + configuration_dir: impl AsRef + Send, + ) -> Result { + let connection = self.get_connection_config()?; + + let file_path = configuration_dir.as_ref().join(CONFIG_FILE_NAME); + + let config_file = fs::read_to_string(&file_path) + .await + .map_err(|err| match err.kind() { + std::io::ErrorKind::NotFound => { + ParseError::CouldNotFindConfiguration(file_path.to_owned()) + } + _ => ParseError::IoError(err), + })?; + + let config = serde_json::from_str::(&config_file).map_err(|err| { + ParseError::ParseError(LocatedError { + file_path: file_path.to_owned(), + line: err.line(), + column: err.column(), + message: err.to_string(), + }) + })?; + + let table_types = config + .tables + .iter() + .map(|(table_alias, table_config)| { + let table_type = self + .validate_and_parse_return_type( + &table_config.return_type, + &config, + &file_path, + &["tables", table_alias, "return_type"], + )? + .map(|columns| { + ( + table_alias.to_owned(), + TableType { + comment: table_config.comment.to_owned(), + columns, + }, + ) + }); + + Ok(table_type) + }) + .chain(config.queries.iter().map(|(query_alias, query_config)| { + let table_type = self + .validate_and_parse_return_type( + &query_config.return_type, + &config, + &file_path, + &["query", query_alias, "return_type"], + )? + .map(|columns| { + ( + query_alias.to_owned(), + TableType { + comment: query_config.comment.to_owned(), + columns, + }, + ) + }); + + Ok(table_type) + })) + .filter_map(|table_type| table_type.transpose()) + .collect::>()?; + + let tables = config + .tables + .iter() + .map(|(table_alias, table_config)| { + Ok(( + table_alias.clone(), + TableConfig { + name: table_config.name.to_owned(), + schema: table_config.schema.to_owned(), + comment: table_config.comment.to_owned(), + primary_key: table_config.primary_key.to_owned(), + return_type: match &table_config.return_type { + ReturnType::Definition { .. } => table_alias.to_owned(), + ReturnType::TableReference { + table_name: target_alias, + } + | ReturnType::QueryReference { + query_name: target_alias, + } => target_alias.to_owned(), + }, + arguments: table_config + .arguments + .iter() + .map(|(name, r#type)| { + let data_type = + ClickHouseDataType::from_str(r#type).map_err(|_err| { + ParseError::ValidateError(InvalidNodes(vec![InvalidNode { + file_path: file_path.to_owned(), + node_path: vec![ + KeyOrIndex::Key("tables".to_string()), + KeyOrIndex::Key(table_alias.to_owned()), + KeyOrIndex::Key("arguments".to_string()), + KeyOrIndex::Key(name.to_owned()), + ], + message: "Unable to parse data type".to_string(), + }])) + })?; + + Ok((name.to_owned(), data_type)) + }) + .collect::>()?, + }, + )) + }) + .collect::, ParseError>>()?; + + let mut queries = BTreeMap::new(); + + for (query_alias, query_config) in config.queries.clone() { + let query_file_path = configuration_dir.as_ref().join(&query_config.file); + let file_content = fs::read_to_string(&query_file_path).await.map_err(|err| { + if let std::io::ErrorKind::NotFound = err.kind() { + ParseError::CouldNotFindConfiguration(query_file_path.to_owned()) + } else { + ParseError::IoError(err) + } + })?; + + let query = ParameterizedQuery::from_str(&file_content).map_err(|err| { + ParseError::ValidateError(InvalidNodes(vec![InvalidNode { + file_path: query_file_path.clone(), + node_path: vec![ + KeyOrIndex::Key("queries".to_string()), + KeyOrIndex::Key(query_alias.clone()), + ], + message: format!("Unable to parse parameterized query: {}", err), + }])) + })?; + + let query_definition = ParameterizedQueryConfig { + exposed_as: query_config.exposed_as.to_owned(), + comment: query_config.comment.to_owned(), + query, + return_type: match query_config.return_type { + ReturnType::Definition { .. } => query_alias.to_owned(), + ReturnType::TableReference { + table_name: target_alias, + } + | ReturnType::QueryReference { + query_name: target_alias, + } => target_alias.to_owned(), + }, + }; + + queries.insert(query_alias.to_owned(), query_definition); + } + + let config = ServerConfig { + connection, + // hardcoding separator for now, to avoid prematurely exposing configuration options we may not want to keep + // if we make this configurable, we must default to this separator when the option is not provided + namespace_separator: ".".to_string(), + table_types, + tables, + queries, + }; + + Ok(config) + } + fn get_connection_config(&self) -> Result { + let url = self.url.to_owned().ok_or(ParseError::Other( + "CLICKHOUSE_URL env var must be set".into(), + ))?; + let username = self.username.to_owned().ok_or(ParseError::Other( + "CLICKHOUSE_USERNAME env var must be set".into(), + ))?; + let password = self.password.to_owned().ok_or(ParseError::Other( + "CLICKHOUSE_PASSWORD env var must be set".into(), + ))?; + + Ok(ConnectionConfig { + url, + username, + password, + }) + } + fn validate_and_parse_return_type( + &self, + return_type: &ReturnType, + config: &ServerConfigFile, + file_path: &Path, + node_path: &[&str], + ) -> Result>, ParseError> { + let get_node_path = |extra_segments: &[&str]| { + node_path + .iter() + .chain(extra_segments.iter()) + .map(|s| KeyOrIndex::Key(s.to_string())) + .collect() + }; + match return_type { + ReturnType::TableReference { table_name } => { + match config.tables.get(table_name) { + Some(TableConfigFile { + return_type: ReturnType::Definition { .. }, + .. + }) => Ok(None), + Some(_) => { + Err(ParseError::ValidateError(InvalidNodes(vec![ + InvalidNode { + file_path: file_path.to_path_buf(), + node_path: get_node_path(&["table_name"]), + message: format!( + "Invalid reference: referenced table {} which does not have a return type definition", + table_name, + ), + }, + ]))) + } + None => { + Err(ParseError::ValidateError(InvalidNodes(vec![ + InvalidNode { + file_path: file_path.to_path_buf(), + node_path: get_node_path(&["table_name"]), + message: format!( + "Orphan reference: cannot find referenced table {}", + table_name, + ), + }, + ]))) + } + } + } + ReturnType::QueryReference { query_name } => { + match config.queries.get(query_name) { + Some(ParameterizedQueryConfigFile { + return_type: ReturnType::Definition { .. }, + .. + }) => Ok(None), + Some(_) => { + Err(ParseError::ValidateError(InvalidNodes(vec![ + InvalidNode { + file_path: file_path.to_path_buf(), + node_path: get_node_path(&["query_name"]), + message: format!( + "Invalid reference: referenced query {} which does not have a return type definition", + query_name, + ), + }, + ]))) + } + None => { + Err(ParseError::ValidateError(InvalidNodes(vec![ + InvalidNode { + file_path: file_path.to_path_buf(), + node_path: get_node_path(&["query_name"]), + message: format!( + "Orphan reference: cannot find referenced query {}", + query_name, + ), + }, + ]))) + } + } + } + ReturnType::Definition { columns } => Ok(Some( + + columns + .iter() + .map(|(field_alias, field_type)| { + let data_type = ClickHouseDataType::from_str(field_type).map_err(|err| { + ParseError::ValidateError(InvalidNodes(vec![InvalidNode { + file_path: file_path.to_path_buf(), + node_path: get_node_path(&["columns", field_alias]), + message: format!( + "Unable to parse data type \"{}\": {}", + field_type, err + ), + }])) + })?; + Ok((field_alias.to_owned(), data_type)) + }) + .collect::, ParseError>>()? + + )) + } + } +} diff --git a/crates/ndc-clickhouse/src/main.rs b/crates/ndc-clickhouse/src/main.rs index ee4e011..5920887 100644 --- a/crates/ndc-clickhouse/src/main.rs +++ b/crates/ndc-clickhouse/src/main.rs @@ -1,9 +1,9 @@ -use ndc_clickhouse::connector::ClickhouseConnector; +use ndc_clickhouse::connector::setup::ClickhouseConnectorSetup; use ndc_sdk::default_main::default_main; use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box> { - default_main::().await + default_main::().await } diff --git a/crates/ndc-clickhouse/src/schema/type_definition.rs b/crates/ndc-clickhouse/src/schema/type_definition.rs index c231c02..b8cec15 100644 --- a/crates/ndc-clickhouse/src/schema/type_definition.rs +++ b/crates/ndc-clickhouse/src/schema/type_definition.rs @@ -129,7 +129,6 @@ impl ClickHouseScalar { ClickHouseDataType::Date32 => Some(Rep::String), ClickHouseDataType::DateTime { .. } => Some(Rep::String), ClickHouseDataType::DateTime64 { .. } => Some(Rep::String), - ClickHouseDataType::Json => Some(Rep::JSON), ClickHouseDataType::Uuid => Some(Rep::String), ClickHouseDataType::IPv4 => Some(Rep::String), ClickHouseDataType::IPv6 => Some(Rep::String), @@ -399,7 +398,6 @@ impl ClickHouseScalar { ClickHouseDataType::DateTime { .. } | ClickHouseDataType::DateTime64 { .. } => { [equality_operators, ordering_operators].concat() } - ClickHouseDataType::Json => [equality_operators, ordering_operators].concat(), ClickHouseDataType::Uuid => [equality_operators, ordering_operators].concat(), ClickHouseDataType::IPv4 => [equality_operators, ordering_operators].concat(), ClickHouseDataType::IPv6 => [equality_operators, ordering_operators].concat(), diff --git a/crates/ndc-clickhouse/src/sql/ast.rs b/crates/ndc-clickhouse/src/sql/ast.rs index b69ced7..6779206 100644 --- a/crates/ndc-clickhouse/src/sql/ast.rs +++ b/crates/ndc-clickhouse/src/sql/ast.rs @@ -1,47 +1,8 @@ -#![allow(dead_code)] use std::fmt; - -mod parameter_extractor; +pub mod format; use common::clickhouse_parser::parameterized_query::ParameterType; -use parameter_extractor::ParameterExtractor; - -//.A statement containing placeholders where parameters used to be -// Should be paired with the corresponding parameters for execution -#[derive(Debug, Clone)] -pub struct ParameterizedStatement(Statement); - -impl ParameterizedStatement { - pub fn format>(self, format: S) -> Self { - Self(self.0.format(format)) - } - pub fn explain(self) -> Self { - Self(self.0.explain()) - } - pub fn to_parameterized_sql_string(&self) -> String { - self.0.to_string() - } -} - -/// A statement that contains parameters that may be user generated -/// Vulnerable to SQL injection -/// Should never be sent to the database, but may be useful as user-facing output when explaining requests -#[derive(Debug, Clone)] -pub struct UnsafeInlinedStatement(Statement); - -impl UnsafeInlinedStatement { - pub fn format>(self, format: S) -> Self { - Self(self.0.format(format)) - } - pub fn explain(self) -> Self { - Self(self.0.explain()) - } - pub fn to_unsafe_sql_string(&self) -> String { - self.0.to_string() - } - pub fn into_parameterized_statement(self) -> (ParameterizedStatement, Vec<(String, String)>) { - ParameterExtractor::extract_statement_parameters(self) - } -} +use format::{display_comma_separated, display_period_separated, display_separated, escape_string}; +use indexmap::IndexMap; #[derive(Debug, Clone)] pub struct Statement { @@ -132,12 +93,12 @@ impl Query { pub fn offset(self, offset: Option) -> Self { Self { offset, ..self } } - pub fn into_statement(self) -> UnsafeInlinedStatement { - UnsafeInlinedStatement(Statement { + pub fn into_statement(self) -> Statement { + Statement { query: self, format: None, explain: false, - }) + } } pub fn into_table_factor(self) -> TableFactor { TableFactor::Derived { @@ -156,27 +117,27 @@ impl Query { impl fmt::Display for Query { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if !self.with.is_empty() { - write!(f, "WITH {} ", display_separated(&self.with, ", "))?; + write!(f, "WITH {} ", display_comma_separated(&self.with))?; } - write!(f, "SELECT {}", display_separated(&self.select, ", "))?; + write!(f, "SELECT {}", display_comma_separated(&self.select))?; if !self.from.is_empty() { - write!(f, " FROM {}", display_separated(&self.from, ", "))?; + write!(f, " FROM {}", display_comma_separated(&self.from))?; } if let Some(predicate) = &self.predicate { write!(f, " WHERE {}", predicate)?; } if !self.group_by.is_empty() { - write!(f, " GROUP BY {}", display_separated(&self.group_by, ", "))?; + write!(f, " GROUP BY {}", display_comma_separated(&self.group_by))?; } if !self.order_by.is_empty() { - write!(f, " ORDER BY {}", display_separated(&self.order_by, ", "))?; + write!(f, " ORDER BY {}", display_comma_separated(&self.order_by))?; } if let Some(limit_by) = &self.limit_by { write!(f, " LIMIT {}", limit_by.limit)?; if let Some(offset) = limit_by.offset { write!(f, " OFFSET {}", offset)?; } - write!(f, " BY {}", display_separated(&limit_by.by, ", "))?; + write!(f, " BY {}", display_comma_separated(&limit_by.by))?; } if let Some(limit) = &self.limit { write!(f, " LIMIT {}", limit)?; @@ -296,7 +257,7 @@ impl fmt::Display for Join { match self.0 { JoinConstraint::On(expr) => write!(f, " ON {expr}"), JoinConstraint::Using(attrs) => { - write!(f, " USING({})", display_separated(attrs, ", ")) + write!(f, " USING({})", display_comma_separated(attrs)) } _ => Ok(()), } @@ -481,14 +442,14 @@ impl fmt::Display for NativeQuery { #[derive(Debug, Clone)] pub enum NativeQueryElement { String(String), - Parameter(Parameter), + Expr(Expr), } impl fmt::Display for NativeQueryElement { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { NativeQueryElement::String(s) => write!(f, "{s}"), - NativeQueryElement::Parameter(p) => write!(f, "{p}"), + NativeQueryElement::Expr(e) => write!(f, "{e}"), } } } @@ -515,7 +476,7 @@ impl ObjectName { impl fmt::Display for ObjectName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", display_separated(&self.0, ".")) + write!(f, "{}", display_period_separated(&self.0)) } } @@ -568,7 +529,7 @@ impl fmt::Display for Expr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Expr::Identifier(ident) => write!(f, "{}", ident), - Expr::CompoundIdentifier(idents) => write!(f, "{}", display_separated(idents, ".")), + Expr::CompoundIdentifier(idents) => write!(f, "{}", display_period_separated(idents)), Expr::BinaryOp { left, op, right } => write!(f, "{} {} {}", left, op, right), Expr::Not(expr) => write!(f, "NOT {expr}"), Expr::Nested(expr) => write!(f, "({})", expr), @@ -576,26 +537,20 @@ impl fmt::Display for Expr { Expr::Parameter(p) => write!(f, "{}", p), Expr::Function(function) => write!(f, "{}", function), Expr::Lambda(lambda) => write!(f, "{}", lambda), - Expr::List(list) => write!(f, "({})", display_separated(list, ", ")), + Expr::List(list) => write!(f, "({})", display_comma_separated(list)), } } } #[derive(Debug, Clone)] -pub enum Parameter { - Value { - data_type: ParameterType, - value: Value, - }, - Placeholder { - data_type: ParameterType, - name: String, - }, +pub struct Parameter { + name: String, + data_type: ParameterType, } impl Parameter { - pub fn new(value: Value, data_type: ParameterType) -> Self { - Self::Value { data_type, value } + pub fn new(name: String, data_type: ParameterType) -> Self { + Self { name, data_type } } pub fn into_expr(self) -> Expr { Expr::Parameter(self) @@ -604,15 +559,7 @@ impl Parameter { impl fmt::Display for Parameter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Parameter::Value { - value, - data_type: _, - } => write!(f, "{value}"), - Parameter::Placeholder { name, data_type } => { - write!(f, "{{{}:{}}}", name, data_type) - } - } + write!(f, "{{{}:{}}}", self.name, self.data_type) } } @@ -639,7 +586,7 @@ impl fmt::Display for Lambda { write!( f, "({}) -> {}", - display_separated(&self.args, ", "), + display_comma_separated(&self.args), self.expr ) } @@ -697,7 +644,7 @@ impl fmt::Display for Function { "{}({}{})", self.name, if self.distinct { "DISTINCT " } else { "" }, - display_separated(&self.args, ", ") + display_comma_separated(&self.args) )?; if let Some(over) = &self.over { write!(f, " OVER ({})", over)?; @@ -717,14 +664,14 @@ impl fmt::Display for WindowSpec { write!( f, "PARTITION BY {}", - display_separated(&self.partition_by, ", ") + display_comma_separated(&self.partition_by) )?; } if !self.order_by.is_empty() { if !self.partition_by.is_empty() { write!(f, " ")?; } - write!(f, "ORDER BY {}", display_separated(&self.order_by, ", "))?; + write!(f, "ORDER BY {}", display_comma_separated(&self.order_by))?; } Ok(()) } @@ -844,6 +791,8 @@ pub enum Value { SingleQuotedString(String), Boolean(bool), Null, + Map(IndexMap), + Array(Vec), } impl Value { @@ -852,30 +801,24 @@ impl Value { } } -impl From for Value { - fn from(value: serde_json::Value) -> Self { - match value { - serde_json::Value::Null => Value::Null, - serde_json::Value::Bool(b) => Value::Boolean(b), - serde_json::Value::Number(n) => Value::Number(n.to_string()), - serde_json::Value::String(s) => Value::SingleQuotedString(s), - // note we may need to convert complex types into escaped json strings rather than json strings. - serde_json::Value::Array(_) => Value::SingleQuotedString(value.to_string()), - serde_json::Value::Object(_) => Value::SingleQuotedString(value.to_string()), - } - } -} - impl From<&serde_json::Value> for Value { fn from(value: &serde_json::Value) -> Self { - match value { - serde_json::Value::Null => Value::Null, - serde_json::Value::Bool(b) => Value::Boolean(b.to_owned()), - serde_json::Value::Number(n) => Value::Number(n.to_string()), - serde_json::Value::String(s) => Value::SingleQuotedString(s.to_owned()), - serde_json::Value::Array(_) => Value::SingleQuotedString(value.to_string()), - serde_json::Value::Object(_) => Value::SingleQuotedString(value.to_string()), + fn map_json_value(value: &serde_json::Value) -> Value { + match value { + serde_json::Value::Null => Value::Null, + serde_json::Value::Bool(b) => Value::Boolean(b.to_owned()), + serde_json::Value::Number(n) => Value::Number(n.to_string()), + serde_json::Value::String(s) => Value::SingleQuotedString(s.to_owned()), + serde_json::Value::Array(arr) => { + Value::Array(arr.iter().map(map_json_value).collect()) + } + serde_json::Value::Object(obj) => Value::Map(IndexMap::from_iter( + obj.into_iter() + .map(|(key, value)| (key.to_owned(), map_json_value(value))), + )), + } } + map_json_value(value) } } @@ -884,10 +827,7 @@ impl fmt::Display for Value { match self { Value::Number(n) => write!(f, "{}", n), Value::SingleQuotedString(s) => { - // clickhouse docs state that backslash and single quotes must be escaped - // docs: https://clickhouse.com/docs/en/sql-reference/syntax#syntax-string-literal - let escaped_value = s.to_owned().replace('\\', r#"\\"#).replace('\'', r#"\'"#); - write!(f, "'{}'", escaped_value) + write!(f, "'{}'", escape_string(s)) } Value::Boolean(b) => { if *b { @@ -897,6 +837,19 @@ impl fmt::Display for Value { } } Value::Null => write!(f, "NULL"), + Value::Map(obj) => { + write!( + f, + "{{{}}}", + display_separated(obj, ",", |f, (key, value)| write!( + f, + "'{}': {}", + escape_string(key), + value + )) + ) + } + Value::Array(arr) => write!(f, "[{}]", display_comma_separated(arr)), } } } @@ -962,36 +915,3 @@ impl fmt::Display for Ident { } } } - -pub struct DisplaySeparated<'a, T> -where - T: fmt::Display, -{ - slice: &'a [T], - separator: &'static str, -} - -fn display_separated<'a, T>(slice: &'a [T], separator: &'static str) -> DisplaySeparated<'a, T> -where - T: fmt::Display, -{ - DisplaySeparated { slice, separator } -} - -impl<'a, T> fmt::Display for DisplaySeparated<'a, T> -where - T: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut first = true; - for t in self.slice { - if first { - first = false; - } else { - write!(f, "{}", self.separator)?; - } - write!(f, "{}", t)?; - } - Ok(()) - } -} diff --git a/crates/ndc-clickhouse/src/sql/ast/format.rs b/crates/ndc-clickhouse/src/sql/ast/format.rs new file mode 100644 index 0000000..5b7def4 --- /dev/null +++ b/crates/ndc-clickhouse/src/sql/ast/format.rs @@ -0,0 +1,114 @@ +use std::fmt; + +pub struct EscapedString<'a>(&'a str); +impl<'a> fmt::Display for EscapedString<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for c in self.0.chars() { + match c { + '\t' => write!(f, "\\t")?, + '\n' => write!(f, "\\n")?, + '\r' => write!(f, "\\r")?, + '\'' => write!(f, "\\'")?, + '\\' => write!(f, "\\\\")?, + _ => write!(f, "{c}")?, + } + } + Ok(()) + } +} +/// clickhouse docs state that backslash and single quotes must be escaped +/// docs: https://clickhouse.com/docs/en/sql-reference/syntax#syntax-string-literal +pub fn escape_string(s: &str) -> EscapedString { + EscapedString(s) +} + +pub struct DisplaySeparated<'a, T, I, F> +where + F: Fn(&'_ mut fmt::Formatter, &I) -> fmt::Result, + &'a T: IntoIterator, +{ + list: &'a T, + separator: &'static str, + print: F, +} + +pub fn display_comma_separated<'a, T, I>( + list: &'a T, +) -> DisplaySeparated<'a, T, I, impl Fn(&mut fmt::Formatter, &I) -> fmt::Result> +where + &'a T: IntoIterator, + I: fmt::Display, +{ + DisplaySeparated { + list, + separator: ", ", + print: |f, i| write!(f, "{i}"), + } +} +pub fn display_period_separated<'a, T, I>( + list: &'a T, +) -> DisplaySeparated<'a, T, I, impl Fn(&mut fmt::Formatter, &I) -> fmt::Result> +where + &'a T: IntoIterator, + I: fmt::Display, +{ + DisplaySeparated { + list, + separator: ".", + print: |f, i| write!(f, "{i}"), + } +} + +pub fn display_separated<'a, T, I, F>( + list: &'a T, + separator: &'static str, + print: F, +) -> DisplaySeparated<'a, T, I, F> +where + &'a T: IntoIterator, + F: Fn(&'_ mut fmt::Formatter, &I) -> fmt::Result, +{ + DisplaySeparated { + list, + separator, + print, + } +} + +impl<'a, T, I, F> fmt::Display for DisplaySeparated<'a, T, I, F> +where + &'a T: IntoIterator, + F: Fn(&mut fmt::Formatter, &I) -> fmt::Result, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut first = true; + for item in self.list { + if first { + first = false + } else { + write!(f, "{}", self.separator)?; + } + (self.print)(f, &item)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_escape_string() { + let test_cases = vec![ + ("", ""), + ("foo", "foo"), + ("foo\nbar", "foo\\nbar"), + ("\\\n\t\r'", "\\\\\\n\\t\\r\\'"), + ]; + + for (raw, escaped) in test_cases { + assert_eq!(escape_string(raw).to_string().as_str(), escaped) + } + } +} diff --git a/crates/ndc-clickhouse/src/sql/ast/parameter_extractor.rs b/crates/ndc-clickhouse/src/sql/ast/parameter_extractor.rs deleted file mode 100644 index 3864a9c..0000000 --- a/crates/ndc-clickhouse/src/sql/ast/parameter_extractor.rs +++ /dev/null @@ -1,171 +0,0 @@ -use super::*; - -pub struct ParameterExtractor { - parameter_index: usize, - parameters: Vec<(String, String)>, -} - -impl ParameterExtractor { - pub fn extract_statement_parameters( - statement: UnsafeInlinedStatement, - ) -> (ParameterizedStatement, Vec<(String, String)>) { - let mut visitor = Self::new(); - let mut statement = statement.0; - visitor.visit_statement(&mut statement); - (ParameterizedStatement(statement), visitor.parameters) - } - fn new() -> Self { - Self { - parameter_index: 0, - parameters: vec![], - } - } - fn visit_statement(&mut self, statment: &mut Statement) { - self.visit_query(&mut statment.query) - } - fn visit_query(&mut self, query: &mut Query) { - for with_item in query.with.iter_mut() { - self.visit_with(with_item) - } - for select in query.select.iter_mut() { - self.visit_select(select) - } - for from in query.from.iter_mut() { - self.visit_from(from) - } - if let Some(ref mut predicate) = &mut query.predicate { - self.visit_expr(predicate) - } - for group_by in query.group_by.iter_mut() { - self.visit_expr(group_by) - } - for order_by in query.order_by.iter_mut() { - self.visit_expr(&mut order_by.expr) - } - if let Some(ref mut limit_by) = &mut query.limit_by { - self.visit_limit_by(limit_by) - } - } - fn visit_with(&mut self, with: &mut WithItem) { - match with { - WithItem::Expr { expr, alias: _ } => self.visit_expr(expr), - WithItem::Query { query, alias: _ } => self.visit_query(query), - } - } - fn visit_select(&mut self, select: &mut SelectItem) { - match select { - SelectItem::UnnamedExpr(expr) => self.visit_expr(expr), - SelectItem::ExprWithAlias { expr, alias: _ } => self.visit_expr(expr), - SelectItem::QualifiedWildcard(_) => {} - SelectItem::Wildcard => {} - } - } - fn visit_limit_by(&mut self, limit_by: &mut LimitByExpr) { - for expr in limit_by.by.iter_mut() { - self.visit_expr(expr) - } - } - fn visit_expr(&mut self, expr: &mut Expr) { - match expr { - Expr::Identifier(_) => {} - Expr::CompoundIdentifier(_) => {} - Expr::BinaryOp { left, op: _, right } => { - self.visit_expr(left); - self.visit_expr(right); - } - Expr::Not(expr) => self.visit_expr(expr), - Expr::Nested(expr) => self.visit_expr(expr), - Expr::Value(_) => {} - Expr::Parameter(parameter) => self.visit_parameter(parameter), - Expr::Function(function) => self.visit_function(function), - Expr::Lambda(lambda) => self.visit_expr(&mut lambda.expr), - Expr::List(list) => { - for expr in list.iter_mut() { - self.visit_expr(expr) - } - } - } - } - fn visit_from(&mut self, from: &mut TableWithJoins) { - self.visit_relation(&mut from.relation); - for join in from.joins.iter_mut() { - self.visit_join(join) - } - } - fn visit_relation(&mut self, relation: &mut TableFactor) { - match relation { - TableFactor::Table { .. } => {} - TableFactor::Derived { - ref mut subquery, - alias: _, - } => self.visit_query(subquery), - TableFactor::TableFunction { - ref mut function, - alias: _, - } => self.visit_function(function), - TableFactor::NativeQuery { - ref mut native_query, - alias: _, - } => self.visit_native_query(native_query), - } - } - fn visit_join(&mut self, join: &mut Join) { - self.visit_relation(&mut join.relation); - match &mut join.join_operator { - JoinOperator::CrossJoin => {} - JoinOperator::Inner(constraint) - | JoinOperator::LeftOuter(constraint) - | JoinOperator::RightOuter(constraint) - | JoinOperator::FullOuter(constraint) => match constraint { - JoinConstraint::On(expr) => self.visit_expr(expr), - JoinConstraint::Using(_) => {} - JoinConstraint::Natural => {} - JoinConstraint::None => {} - }, - } - } - fn visit_function(&mut self, function: &mut Function) { - for arg in function.args.iter_mut() { - match arg.value { - FunctionArgExpr::Expr(ref mut expr) => self.visit_expr(expr), - FunctionArgExpr::QualifiedWildcard(_) => {} - FunctionArgExpr::Wildcard => {} - } - } - if let Some(ref mut over) = &mut function.over { - for partion_by in over.partition_by.iter_mut() { - self.visit_expr(partion_by) - } - for order_by in over.order_by.iter_mut() { - self.visit_expr(&mut order_by.expr) - } - } - } - fn visit_native_query(&mut self, native_query: &mut NativeQuery) { - for element in native_query.elements.iter_mut() { - match element { - NativeQueryElement::String(_) => {} - NativeQueryElement::Parameter(ref mut parameter) => self.visit_parameter(parameter), - } - } - } - fn visit_parameter(&mut self, parameter: &mut Parameter) { - match parameter { - Parameter::Placeholder { .. } => panic!("Attempted to extract parameter that had already been replaced with a placeholder. This is likely a bug"), - Parameter::Value { data_type, value } => { - // for single quoted string, we want the underlying string, - // not the escaped, quoted version we get by calling to_string() - let value = match value { - Value::SingleQuotedString(s) => s.to_owned(), - _ => value.to_string() - }; - self.parameters.push(( - format!("param_p{}", self.parameter_index), - value.to_string() - )); - *parameter = Parameter::Placeholder { data_type: data_type.to_owned(), name: format!("p{}", self.parameter_index) }; - self.parameter_index += 1; - }, - } - } -} diff --git a/crates/ndc-clickhouse/src/sql/query_builder.rs b/crates/ndc-clickhouse/src/sql/query_builder.rs index ab18d55..68b45e9 100644 --- a/crates/ndc-clickhouse/src/sql/query_builder.rs +++ b/crates/ndc-clickhouse/src/sql/query_builder.rs @@ -1,6 +1,7 @@ mod collection_context; mod comparison_column; mod error; +pub mod parameter; mod typecasting; use self::{collection_context::CollectionContext, typecasting::RowsetTypeString}; @@ -19,6 +20,7 @@ use comparison_column::ComparisonColumn; pub use error::QueryBuilderError; use indexmap::IndexMap; use ndc_sdk::models; +use parameter::ParameterBuilder; use std::{collections::BTreeMap, iter, str::FromStr}; pub struct QueryBuilder<'r, 'c> { @@ -26,6 +28,8 @@ pub struct QueryBuilder<'r, 'c> { configuration: &'c ServerConfig, } +type Parameters = Vec<(String, String)>; + impl<'r, 'c> QueryBuilder<'r, 'c> { pub fn new(request: &'r models::QueryRequest, configuration: &'c ServerConfig) -> Self { Self { @@ -33,10 +37,24 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { configuration, } } - pub fn build(&self) -> Result { - self.root_query() + pub fn build_parameterized(&self) -> Result<(Statement, Parameters), QueryBuilderError> { + let mut parameters = ParameterBuilder::new(false); + let statement = self.root_query(&mut parameters)?; + + let parameters = parameters.into_parameters(); + + Ok((statement, parameters)) + } + pub fn build_inlined(&self) -> Result { + let mut parameters = ParameterBuilder::new(true); + let statement = self.root_query(&mut parameters)?; + + Ok(statement) } - fn root_query(&self) -> Result { + fn root_query( + &self, + parameters: &mut ParameterBuilder, + ) -> Result { let collection = CollectionContext::new(&self.request.collection, &self.request.arguments); let query = &self.request.query; @@ -94,14 +112,10 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { } } - let variables_values = - Parameter::new( - Value::SingleQuotedString(serde_json::to_string(&variable_values).map_err( - |err| QueryBuilderError::CannotSerializeVariables(err.to_string()), - )?), - ClickHouseDataType::String.into(), - ) - .into_expr(); + let variables_values = parameters.bind_string( + &serde_json::to_string(&variable_values) + .map_err(|err| QueryBuilderError::CannotSerializeVariables(err.to_string()))?, + ); vec![Query::default() .select(vec![SelectItem::Wildcard]) @@ -118,7 +132,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { }; let rowset_subquery = self - .rowset_subquery(&collection, &vec![], query)? + .rowset_subquery(&collection, &vec![], query, parameters)? .into_table_factor() .alias("_rowset"); @@ -135,7 +149,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .into_box(), op: BinaryOperator::Eq, right: Expr::CompoundIdentifier(vec![ - Ident::new_quoted("_rowset".to_owned()), + Ident::new_quoted("_rowset"), Ident::new_quoted("_varset_id"), ]) .into_box(), @@ -177,6 +191,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection: &CollectionContext, relkeys: &Vec<&String>, query: &models::Query, + parameters: &mut ParameterBuilder, ) -> Result { let fields = if let Some(fields) = &query.fields { let row = if fields.is_empty() { @@ -286,7 +301,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { } let from = vec![self - .row_subquery(current_collection, relkeys, query)? + .row_subquery(current_collection, relkeys, query, parameters)? .into_table_factor() .alias("_row") .into_table_with_joins(vec![])]; @@ -298,6 +313,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection: &CollectionContext, relkeys: &Vec<&String>, query: &models::Query, + parameters: &mut ParameterBuilder, ) -> Result { let (table, mut base_joins) = if self.request.variables.is_some() { let table = ObjectName(vec![Ident::new_quoted("_vars")]) @@ -305,12 +321,16 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .alias("_vars"); let joins = vec![Join { - relation: self.collection_ident(current_collection)?.alias("_origin"), + relation: self + .collection_ident(current_collection, parameters)? + .alias("_origin"), join_operator: JoinOperator::CrossJoin, }]; (table, joins) } else { - let table = self.collection_ident(current_collection)?.alias("_origin"); + let table = self + .collection_ident(current_collection, parameters)? + .alias("_origin"); (table, vec![]) }; @@ -342,6 +362,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { false, fields.as_ref(), &mut rel_index, + parameters, )? { select.push(expr.into_select(Some(format!("_field_{alias}")))); base_joins.append(&mut joins); @@ -365,6 +386,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { query, relationship, arguments, + parameters, )?; select.push(expr.into_select(Some(format!("_field_{alias}")))); @@ -419,13 +441,14 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, true, &mut 0, + parameters, ) .map(|(expr, joins)| (Some(expr), joins))? } else { (None, vec![]) }; - let (order_by_exprs, order_by_joins) = self.order_by(&query.order_by)?; + let (order_by_exprs, order_by_joins) = self.order_by(&query.order_by, parameters)?; let joins = base_joins .into_iter() @@ -489,6 +512,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { fn order_by( &self, order_by: &Option, + parameters: &mut ParameterBuilder, ) -> Result<(Vec, Vec), QueryBuilderError> { let mut order_by_exprs = vec![]; let mut order_by_joins = vec![]; @@ -588,7 +612,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { } let table = self - .collection_ident(&relationship_collection)? + .collection_ident(&relationship_collection, parameters)? .alias(&join_alias); let (table, base_joins) = if self.request.variables.is_some() { @@ -616,6 +640,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &relationship_collection, false, &mut join_index, + parameters, )?; additional_predicate.push(predicate); @@ -678,7 +703,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .unwrap_or(JoinOperator::CrossJoin); let relation = self - .collection_ident(&relationship_collection)? + .collection_ident(&relationship_collection, parameters)? .alias(&join_alias); let join = Join { @@ -695,6 +720,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &relationship_collection, false, &mut join_index, + parameters, )?; additional_predicate.push(predicate); @@ -836,6 +862,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { Ok((order_by_exprs, order_by_joins)) } + #[allow(clippy::too_many_arguments)] fn field_relationship( &self, field_alias: &str, @@ -844,6 +871,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { query: &models::Query, relationship: &str, arguments: &BTreeMap, + parameters: &mut ParameterBuilder, ) -> Result<(Expr, Join), QueryBuilderError> { let join_alias = format!("_rel_{name_index}_{field_alias}"); *name_index += 1; @@ -900,7 +928,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { let join = Join { relation: self - .rowset_subquery(&relationship_collection, &relkeys, query)? + .rowset_subquery(&relationship_collection, &relkeys, query, parameters)? .into_table_factor() .alias(&join_alias), join_operator, @@ -920,6 +948,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection: &CollectionContext, current_is_origin: bool, name_index: &mut u32, + parameters: &mut ParameterBuilder, ) -> Result<(Expr, Vec), QueryBuilderError> { match expression { models::Expression::And { expressions } => { @@ -932,6 +961,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, ) }) .collect::, _>>()? @@ -963,6 +993,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, ) }) .collect::, _>>()? @@ -991,6 +1022,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, )?; let not_expression = Expr::Not(expression.into_nested().into_box()); Ok((not_expression, joins)) @@ -1002,6 +1034,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, )?; let (expression, joins) = left_col.apply(|left_col| { @@ -1033,6 +1066,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, )?; // special case: right hand data types is assumed to always be the same type as left hand, @@ -1052,9 +1086,10 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection, current_is_origin, name_index, + parameters, )?, models::ComparisonValue::Scalar { value } => ComparisonColumn::new_simple( - Parameter::new(value.into(), right_col_type.clone().into()).into_expr(), + parameters.bind_json(value, right_col_type.clone().into()), right_col_type, ), @@ -1091,6 +1126,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { predicate, current_join_alias, name_index, + parameters, ), } } @@ -1100,6 +1136,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { expression: &Option>, previous_join_alias: &Ident, name_index: &mut u32, + parameters: &mut ParameterBuilder, ) -> Result<(Expr, Vec), QueryBuilderError> { let exists_join_ident = Ident::new_quoted(format!("_exists_{}", name_index)); *name_index += 1; @@ -1130,6 +1167,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &target_collection, false, name_index, + parameters, )?; (Some(predicate), predicate_joins) } @@ -1137,7 +1175,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { }; let table = self - .collection_ident(&target_collection)? + .collection_ident(&target_collection, parameters)? .alias(&subquery_origin_alias); let (table, base_joins) = if self.request.variables.is_some() { @@ -1295,6 +1333,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { current_collection: &CollectionContext, current_is_origin: bool, name_index: &mut u32, + parameters: &mut ParameterBuilder, ) -> Result { match column { models::ComparisonTarget::Column { @@ -1352,7 +1391,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { } let table = self - .collection_ident(&relationship_collection)? + .collection_ident(&relationship_collection, parameters)? .alias(&join_alias); let (table, base_joins) = if self.request.variables.is_some() { @@ -1380,6 +1419,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &relationship_collection, false, &mut join_index, + parameters, )?; additional_predicate.push(predicate); @@ -1433,7 +1473,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .unwrap_or(JoinOperator::CrossJoin); let relation = self - .collection_ident(&relationship_collection)? + .collection_ident(&relationship_collection, parameters)? .alias(&join_alias); let join = Join { @@ -1450,6 +1490,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &relationship_collection, false, &mut join_index, + parameters, )?; additional_predicate.push(predicate); @@ -1608,7 +1649,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .unwrap_or(JoinOperator::CrossJoin); let table = self - .collection_ident(&relationship_collection)? + .collection_ident(&relationship_collection, parameters)? .alias(&join_alias); let join = Join { @@ -1625,6 +1666,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { &relationship_collection, false, name_index, + parameters, )?; additional_predicates.push(predicate); @@ -1699,6 +1741,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { fn collection_ident( &self, collection: &CollectionContext, + parameters: &mut ParameterBuilder, ) -> Result { if let Some(table) = self.configuration.tables.get(collection.alias()) { let table_argument_type = |argument_name: &str| { @@ -1725,13 +1768,11 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { }; Ok(column_ident.into_arg().name(Ident::new_quoted(arg_name))) }; - let literal_argument = |arg_name: &String, value: &serde_json::Value| { - Ok(Expr::Parameter(Parameter::new( - value.into(), - table_argument_type(arg_name)?.to_owned().into(), - )) - .into_arg() - .name(Ident::new_quoted(arg_name))) + let mut literal_argument = |arg_name: &String, value: &serde_json::Value| { + Ok(parameters + .bind_json(value, table_argument_type(arg_name)?.to_owned().into()) + .into_arg() + .name(Ident::new_quoted(arg_name))) }; let table_name = ObjectName(vec![ Ident::new_quoted(&table.schema), @@ -1859,10 +1900,9 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { ParameterizedQueryElement::Parameter(p) => get_argument(p.name.value()) .transpose()? .map(|value| { - NativeQueryElement::Parameter(Parameter::new( - value.into(), - p.r#type.clone(), - )) + NativeQueryElement::Expr( + parameters.bind_json(value, p.r#type.to_owned()), + ) }) .ok_or_else(|| QueryBuilderError::MissingNativeQueryArgument { query: collection.alias().to_owned(), @@ -1918,6 +1958,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { traversed_array: bool, field_selector: Option<&models::NestedField>, rel_index: &mut u32, + parameters: &mut ParameterBuilder, ) -> Result)>, QueryBuilderError> { if let Some(fields) = field_selector { match fields { @@ -1940,6 +1981,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { true, Some(&inner.fields), rel_index, + parameters, )? { if !joins.is_empty() { return Err(QueryBuilderError::Unexpected("column accessor should not return relationship joins after array traversal".to_string())); @@ -2004,6 +2046,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { traversed_array, fields.as_ref(), rel_index, + parameters, )? { accessor_required = true; required_joins.append(&mut joins); @@ -2030,6 +2073,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { query, relationship, arguments, + parameters, )?; required_joins.push(join); diff --git a/crates/ndc-clickhouse/src/sql/query_builder/error.rs b/crates/ndc-clickhouse/src/sql/query_builder/error.rs index 4339229..b04bc92 100644 --- a/crates/ndc-clickhouse/src/sql/query_builder/error.rs +++ b/crates/ndc-clickhouse/src/sql/query_builder/error.rs @@ -106,12 +106,10 @@ impl From for QueryError { | QueryBuilderError::UnknownBinaryComparisonOperator(_) | QueryBuilderError::Typecasting(_) | QueryBuilderError::ColumnTypeMismatch { .. } => { - QueryError::InvalidRequest(value.to_string()) + QueryError::new_invalid_request(&value) } - QueryBuilderError::NotSupported(_) => { - QueryError::UnsupportedOperation(value.to_string()) - } - QueryBuilderError::Unexpected(_) => QueryError::Other(Box::new(value)), + QueryBuilderError::NotSupported(_) => QueryError::new_unsupported_operation(&value), + QueryBuilderError::Unexpected(_) => QueryError::new(value), } } } @@ -132,12 +130,10 @@ impl From for ExplainError { | QueryBuilderError::UnknownBinaryComparisonOperator(_) | QueryBuilderError::Typecasting(_) | QueryBuilderError::ColumnTypeMismatch { .. } => { - ExplainError::InvalidRequest(value.to_string()) - } - QueryBuilderError::NotSupported(_) => { - ExplainError::UnsupportedOperation(value.to_string()) + ExplainError::new_invalid_request(&value) } - QueryBuilderError::Unexpected(_) => ExplainError::Other(Box::new(value)), + QueryBuilderError::NotSupported(_) => ExplainError::new_unsupported_operation(&value), + QueryBuilderError::Unexpected(_) => ExplainError::new(Box::new(value)), } } } diff --git a/crates/ndc-clickhouse/src/sql/query_builder/parameter.rs b/crates/ndc-clickhouse/src/sql/query_builder/parameter.rs new file mode 100644 index 0000000..27618e7 --- /dev/null +++ b/crates/ndc-clickhouse/src/sql/query_builder/parameter.rs @@ -0,0 +1,334 @@ +use super::{format::escape_string, Expr, Parameter, Value}; +use crate::sql::ast::format::display_separated; +use common::clickhouse_parser::{datatype::ClickHouseDataType, parameterized_query::ParameterType}; +use std::fmt::Display; + +pub struct ParameterBuilder { + inline_parameters: bool, + index: u32, + parameters: Vec<(String, String)>, +} + +impl ParameterBuilder { + pub fn new(inline_parameters: bool) -> Self { + Self { + inline_parameters, + index: 0, + parameters: vec![], + } + } + fn bind_parameter(&mut self, value: String, data_type: ParameterType) -> Expr { + let parameter_name = format!("param_p{}", self.index); + let placeholder_name = format!("p{}", self.index); + self.index += 1; + + self.parameters.push((parameter_name, value)); + + let placeholder = Parameter::new(placeholder_name, data_type); + placeholder.into_expr() + } + pub fn bind_json(&mut self, value: &serde_json::Value, data_type: ParameterType) -> Expr { + if self.inline_parameters { + let value: Value = value.into(); + value.into_expr() + } else { + self.bind_parameter(ParameterValue(value).to_string(), data_type) + } + } + pub fn bind_string(&mut self, value: &str) -> Expr { + if self.inline_parameters { + let value = Value::SingleQuotedString(value.to_owned()); + value.into_expr() + } else { + self.bind_parameter( + escape_string(value).to_string(), + ClickHouseDataType::String.into(), + ) + } + } + pub fn into_parameters(self) -> Vec<(String, String)> { + self.parameters + } +} + +struct ParameterValue<'a>(&'a serde_json::Value); + +impl<'a> Display for ParameterValue<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0 { + // top level string should not be quoted + serde_json::Value::String(s) => write!(f, "{}", escape_string(s)), + _ => print_parameter_value(f, self.0), + } + } +} + +fn print_parameter_value( + f: &mut std::fmt::Formatter<'_>, + value: &serde_json::Value, +) -> std::fmt::Result { + match value { + // note: serializing null as \N seems to be a default configuration + // we may need to add a configuration option for this in the future, + // but let's wait until a user actually asks for it + // ref: https://clickhouse.com/docs/en/operations/settings/formats#format_tsv_null_representation + serde_json::Value::Null => write!(f, "\\N"), + serde_json::Value::Bool(b) => { + if *b { + write!(f, "true") + } else { + write!(f, "false") + } + } + serde_json::Value::Number(n) => write!(f, "{n}"), + serde_json::Value::String(s) => write!(f, "'{}'", escape_string(s)), + serde_json::Value::Array(arr) => { + write!( + f, + "[{}]", + display_separated(arr, ",", |f, i| print_parameter_value(f, i)) + ) + } + serde_json::Value::Object(obj) => { + write!( + f, + "{{{}}}", + display_separated(obj, ",", |f, (key, value)| { + write!(f, "'{}':", escape_string(key))?; + print_parameter_value(f, value) + }) + ) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::str::FromStr; + + #[test] + fn can_print_parameter() { + let test_cases = vec![ + (json!("foo"), "foo"), + (json!(true), "true"), + (json!(false), "false"), + (json!({ "foo": "bar"}), "{'foo':'bar'}"), + (json!(["foo", "bar"]), "['foo','bar']"), + (json!(null), "\\N"), + ]; + + for (value, printed) in test_cases { + assert_eq!(ParameterValue(&value).to_string().as_str(), printed) + } + } + #[test] + fn inline_string_parameters() { + let mut parameters = ParameterBuilder::new(true); + + let test_cases = vec![ + ("foo", "'foo'"), + ("foo'bar", "'foo\\'bar'"), + ("foo\nbar", "'foo\\nbar'"), + ("foo\n\\\r\tbar", "'foo\\n\\\\\\r\\tbar'"), + ]; + + for (value, expected) in &test_cases { + let inlined = parameters.bind_string(value); + + assert_eq!( + &inlined.to_string().as_str(), + expected, + "string parameter {} should be inlined as {}", + value, + expected + ) + } + + let bound_parameters = parameters.into_parameters(); + assert_eq!( + bound_parameters, + vec![], + "bound params should be empty after inlining parameters" + ); + } + #[test] + fn inline_json_parameters() { + let mut parameters = ParameterBuilder::new(true); + + let test_cases = vec![ + (json!("foo"), "String", "'foo'"), + (json!(["foo", "bar"]), "Array(String)", "['foo', 'bar']"), + ( + json!({"foo": "bar"}), + "Map(String, String)", + "{'foo': 'bar'}", + ), + (json!(null), "Nullable(String)", "NULL"), + (json!(2), "Int32", "2"), + (json!(true), "Bool", "TRUE"), + (json!(false), "Bool", "FALSE"), + ( + json!({ "foo": ["bar"], "baz": null }), + "Tuple(foo Array(String), baz Nullable(String))", + "{'foo': ['bar'],'baz': NULL}", + ), + ]; + + for (value, data_type_string, expected) in &test_cases { + let data_type = ClickHouseDataType::from_str(data_type_string) + .expect("Data type string should be valid ClickHouseDataType"); + let inlined = parameters.bind_json(value, data_type.into()); + + assert_eq!( + &inlined.to_string().as_str(), + expected, + "parameter {} of type {} should be inlined as {}", + value, + data_type_string, + expected + ) + } + + let bound_parameters = parameters.into_parameters(); + assert_eq!( + bound_parameters, + vec![], + "bound params should be empty after inlining parameters" + ); + } + + #[test] + fn bind_string_parameters() { + let mut parameters = ParameterBuilder::new(false); + + let test_cases = vec![ + ("foo", "{p0:String}", "param_p0", "foo"), + ("foo\rbar", "{p1:String}", "param_p1", "foo\\rbar"), + ("foo'bar", "{p2:String}", "param_p2", "foo\\'bar"), + ( + "foo\\\r\t\nbar", + "{p3:String}", + "param_p3", + "foo\\\\\\r\\t\\nbar", + ), + ]; + let mut placeholders = vec![]; + + for (value, _, _, _) in &test_cases { + let placeholder = parameters.bind_string(value); + placeholders.push(placeholder); + } + + let values = parameters.into_parameters(); + + for ( + ( + (_value, expected_placeholder, expected_param_name, expected_param_value), + placeholder, + ), + (param_name, param_value), + ) in test_cases + .iter() + .zip(placeholders.iter()) + .zip(values.iter()) + { + assert_eq!(expected_placeholder, &placeholder.to_string().as_str()); + assert_eq!(expected_param_name, param_name,); + assert_eq!(expected_param_value, param_value); + } + } + #[test] + fn bind_json_parameters() { + let mut parameters = ParameterBuilder::new(false); + + let test_cases = vec![ + (json!("foo"), "String", "{p0:String}", "param_p0", "foo"), + ( + json!("foo\rbar"), + "String", + "{p1:String}", + "param_p1", + "foo\\rbar", + ), + ( + json!("foo'bar"), + "String", + "{p2:String}", + "param_p2", + "foo\\'bar", + ), + ( + json!("foo\\\r\t\nbar"), + "String", + "{p3:String}", + "param_p3", + "foo\\\\\\r\\t\\nbar", + ), + (json!(1), "UInt32", "{p4:UInt32}", "param_p4", "1"), + ( + json!(null), + "Nullable(String)", + "{p5:Nullable(String)}", + "param_p5", + "\\N", + ), + (json!(true), "Bool", "{p6:Bool}", "param_p6", "true"), + (json!(false), "Bool", "{p7:Bool}", "param_p7", "false"), + ( + json!({"foo": "bar"}), + "Map(String, String)", + "{p8:Map(String, String)}", + "param_p8", + "{'foo':'bar'}", + ), + ( + json!({"foo'\n\r\t\\bar": "baz"}), + "Map(String, String)", + "{p9:Map(String, String)}", + "param_p9", + "{'foo\\'\\n\\r\\t\\\\bar':'baz'}", + ), + ( + json!(["foo", "bar"]), + "Array(String)", + "{p10:Array(String)}", + "param_p10", + "['foo','bar']", + ), + ]; + let mut placeholders = vec![]; + + for (value, data_type_string, _, _, _) in &test_cases { + let data_type = ClickHouseDataType::from_str(data_type_string) + .expect("Data type string should be valid ClickHouseDataType"); + let placeholder = parameters.bind_json(value, data_type.into()); + placeholders.push(placeholder); + } + + let values = parameters.into_parameters(); + + for ( + ( + ( + _value, + _data_type, + expected_placeholder, + expected_param_name, + expected_param_value, + ), + placeholder, + ), + (param_name, param_value), + ) in test_cases + .iter() + .zip(placeholders.iter()) + .zip(values.iter()) + { + assert_eq!(expected_placeholder, &placeholder.to_string().as_str()); + assert_eq!(expected_param_name, param_name,); + assert_eq!(expected_param_value, param_value); + } + } +} diff --git a/crates/ndc-clickhouse/tests/query_builder.rs b/crates/ndc-clickhouse/tests/query_builder.rs index 913dfeb..dd29a2d 100644 --- a/crates/ndc-clickhouse/tests/query_builder.rs +++ b/crates/ndc-clickhouse/tests/query_builder.rs @@ -8,11 +8,11 @@ use tokio::fs; mod test_utils { use common::config::ServerConfig; use ndc_clickhouse::{ - connector::read_server_config, + connector::setup::ClickhouseConnectorSetup, sql::{QueryBuilder, QueryBuilderError}, }; use ndc_sdk::models; - use std::{env, error::Error, path::PathBuf}; + use std::{collections::HashMap, env, error::Error, path::PathBuf}; use tokio::fs; /// when running tests locally, this can be set to true to update reference files @@ -34,11 +34,14 @@ mod test_utils { } async fn read_mock_configuration(schema_dir: &str) -> Result> { // set mock values for required env vars, we won't be reading these anyways - env::set_var("CLICKHOUSE_URL", ""); - env::set_var("CLICKHOUSE_USERNAME", ""); - env::set_var("CLICKHOUSE_PASSWORD", ""); + let env = HashMap::from_iter(vec![ + ("CLICKHOUSE_URL".to_owned(), "".to_owned()), + ("CLICKHOUSE_USERNAME".to_owned(), "".to_owned()), + ("CLICKHOUSE_PASSWORD".to_owned(), "".to_owned()), + ]); + let setup = ClickhouseConnectorSetup::new_from_env(env); let config_dir = config_dir_path(schema_dir); - let configuration = read_server_config(config_dir).await?; + let configuration = setup.read_server_config(config_dir).await?; Ok(configuration) } async fn read_request( @@ -93,8 +96,8 @@ mod test_utils { ) -> Result { let generated_statement = pretty_print_sql( &QueryBuilder::new(request, configuration) - .build()? - .to_unsafe_sql_string(), + .build_inlined()? + .to_string(), ); Ok(generated_statement) }