diff --git a/CHANGELOG.md b/CHANGELOG.md index c5be231..de2b931 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.6] + +- Add additional trace spans for SQL generation and query execution +- Do not parse db response as JSON, instead send it back directly + ## [0.2.5] - Implement validate cli command diff --git a/Cargo.lock b/Cargo.lock index d95bd4d..a7d6764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,13 +340,15 @@ checksum = "97af0562545a7d7f3d9222fcf909963bec36dcb502afaacab98c6ffac8da47ce" [[package]] name = "common" -version = "0.2.5" +version = "0.2.6" dependencies = [ + "bytes", "peg", "reqwest 0.12.3", "schemars", "serde", "serde_json", + "tracing", ] [[package]] @@ -365,6 +367,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crc32fast" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -468,6 +479,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1044,9 +1065,10 @@ dependencies = [ [[package]] name = "ndc-clickhouse" -version = "0.2.5" +version = "0.2.6" dependencies = [ "async-trait", + "bytes", "common", "indexmap 2.2.5", "ndc-sdk", @@ -1057,11 +1079,12 @@ dependencies = [ "sqlformat", "strum", "tokio", + "tracing", ] [[package]] name = "ndc-clickhouse-cli" -version = "0.2.5" +version = "0.2.6" dependencies = [ "clap", "common", @@ -1072,26 +1095,21 @@ dependencies = [ ] [[package]] -name = "ndc-client" -version = "0.1.1" -source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.1#17c61946cc9a3ff6dcee1d535af33141213b639a" +name = "ndc-models" +version = "0.1.2" +source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.2#6e7d12a31787d5f618099a42ddc0bea786438c00" dependencies = [ - "async-trait", "indexmap 2.2.5", - "opentelemetry", - "reqwest 0.11.27", "schemars", "serde", - "serde_derive", "serde_json", "serde_with", - "url", ] [[package]] name = "ndc-sdk" version = "0.1.0" -source = "git+https://github.com/hasura/ndc-sdk-rs?rev=7b56fac#7b56fac3aba2bc6533d3163111377fd5fbeb3011" +source = "git+https://github.com/hasura/ndc-sdk-rs?rev=972dba6#972dba6e270ad54f4748487f75018c24229c1e5e" dependencies = [ "async-trait", "axum", @@ -1100,12 +1118,13 @@ dependencies = [ "clap", "http 0.2.12", "mime", - "ndc-client", + "ndc-models", "ndc-test", "opentelemetry", "opentelemetry-http", "opentelemetry-otlp", "opentelemetry-semantic-conventions", + "opentelemetry-zipkin", "opentelemetry_sdk", "prometheus", "reqwest 0.11.27", @@ -1122,14 +1141,14 @@ dependencies = [ [[package]] name = "ndc-test" -version = "0.1.1" -source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.1#17c61946cc9a3ff6dcee1d535af33141213b639a" +version = "0.1.2" +source = "git+http://github.com/hasura/ndc-spec.git?tag=v0.1.2#6e7d12a31787d5f618099a42ddc0bea786438c00" dependencies = [ "async-trait", "clap", "colorful", "indexmap 2.2.5", - "ndc-client", + "ndc-models", "rand", "reqwest 0.11.27", "semver", @@ -1137,6 +1156,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "url", ] [[package]] @@ -1310,6 +1330,27 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" +[[package]] +name = "opentelemetry-zipkin" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6943c09b1b7c17b403ae842b00f23e6d5fc6f5ec06cccb3f39aca97094a899a" +dependencies = [ + "async-trait", + "futures-core", + "http 0.2.12", + "once_cell", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest 0.11.27", + "serde", + "serde_json", + "thiserror", + "typed-builder", +] + [[package]] name = "opentelemetry_sdk" version = "0.22.1" @@ -1728,6 +1769,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2283,6 +2337,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", + "flate2", "h2", "http 0.2.12", "http-body 0.4.6", @@ -2291,7 +2346,11 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-native-certs", + "rustls-pemfile 2.1.2", + "rustls-pki-types", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -2448,6 +2507,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-builder" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77739c880e00693faef3d65ea3aad725f196da38b22fdc7ea6ded6e1ce4d3add" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f718dfaf347dcb5b983bfc87608144b0bad87970aebcbea5ce44d2a30c08e63" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.53", +] + [[package]] name = "unicase" version = "2.7.0" diff --git a/Cargo.toml b/Cargo.toml index fb58891..948f6e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,5 @@ members = [ ] resolver = "2" -package.version = "0.2.5" +package.version = "0.2.6" package.edition = "2021" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 0cf7815..ee7b01a 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -4,11 +4,13 @@ version.workspace = true edition.workspace = true [dependencies] +bytes = "1.6.0" +peg = "0.8.2" reqwest = { version = "0.12.3", features = [ "json", "rustls-tls", ], default-features = false } +schemars = "0.8.16" serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" -peg = "0.8.2" -schemars = "0.8.16" +tracing = "0.1.40" diff --git a/crates/common/src/client.rs b/crates/common/src/client.rs index 29f99ef..25d8aa8 100644 --- a/crates/common/src/client.rs +++ b/crates/common/src/client.rs @@ -1,6 +1,8 @@ use std::error::Error; +use bytes::Bytes; use serde::{de::DeserializeOwned, Deserialize}; +use tracing::Instrument; use crate::config::ConnectionConfig; @@ -12,7 +14,35 @@ pub fn get_http_client( Ok(client) } -pub async fn execute_query( +pub async fn execute_query( + client: &reqwest::Client, + connection_config: &ConnectionConfig, + statement: &str, + parameters: &Vec<(String, String)>, +) -> Result> { + let response = client + .post(&connection_config.url) + .header("X-ClickHouse-User", &connection_config.username) + .header("X-ClickHouse-Key", &connection_config.password) + .query(parameters) + .body(statement.to_owned()) + .send() + .instrument(tracing::info_span!("Execute HTTP request")) + .await?; + + if response.error_for_status_ref().is_err() { + return Err(response.text().await?.into()); + } + + let response = response + .bytes() + .instrument(tracing::info_span!("Read HTTP response")) + .await?; + + Ok(response) +} + +pub async fn execute_json_query( client: &reqwest::Client, connection_config: &ConnectionConfig, statement: &str, @@ -25,13 +55,17 @@ pub async fn execute_query( .query(parameters) .body(statement.to_owned()) .send() + .instrument(tracing::info_span!("Execute HTTP request")) .await?; if response.error_for_status_ref().is_err() { return Err(response.text().await?.into()); } - let payload: ClickHouseResponse = response.json().await?; + let payload: ClickHouseResponse = response + .json() + .instrument(tracing::info_span!("Parse HTTP response")) + .await?; Ok(payload.data) } diff --git a/crates/ndc-clickhouse-cli/src/database_introspection.rs b/crates/ndc-clickhouse-cli/src/database_introspection.rs index 4ef71be..e555507 100644 --- a/crates/ndc-clickhouse-cli/src/database_introspection.rs +++ b/crates/ndc-clickhouse-cli/src/database_introspection.rs @@ -3,7 +3,7 @@ use std::error::Error; use serde::Deserialize; use common::{ - client::{execute_query, get_http_client}, + client::{execute_json_query, get_http_client}, config::ConnectionConfig, }; @@ -44,5 +44,5 @@ pub async fn introspect_database( ) -> Result, Box> { let introspection_sql = include_str!("./database_introspection.sql"); let client = get_http_client(connection_config)?; - execute_query::(&client, connection_config, introspection_sql, &vec![]).await + execute_json_query::(&client, connection_config, introspection_sql, &vec![]).await } diff --git a/crates/ndc-clickhouse/Cargo.toml b/crates/ndc-clickhouse/Cargo.toml index 2d37b3b..059862b 100644 --- a/crates/ndc-clickhouse/Cargo.toml +++ b/crates/ndc-clickhouse/Cargo.toml @@ -5,9 +5,10 @@ edition.workspace = true [dependencies] async-trait = "0.1.78" +bytes = "1.6.0" common = { path = "../common" } indexmap = "2.1.0" -ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs", rev = "7b56fac", package = "ndc-sdk" } +ndc-sdk = { git = "https://github.com/hasura/ndc-sdk-rs", rev = "972dba6", package = "ndc-sdk" } prometheus = "0.13.3" reqwest = { version = "0.12.3", features = [ "json", @@ -18,3 +19,4 @@ serde_json = "1.0.114" sqlformat = "0.2.3" strum = { version = "0.26.2", features = ["derive"] } tokio = "1.36.0" +tracing = "0.1.40" diff --git a/crates/ndc-clickhouse/src/connector.rs b/crates/ndc-clickhouse/src/connector.rs index ab8fc71..d8383b9 100644 --- a/crates/ndc-clickhouse/src/connector.rs +++ b/crates/ndc-clickhouse/src/connector.rs @@ -88,9 +88,7 @@ impl Connector for ClickhouseConnector { async fn get_schema( configuration: &Self::Configuration, ) -> Result, SchemaError> { - handler::schema(configuration) - .await - .map(JsonResponse::Value) + handler::schema(configuration).await } async fn query_explain( @@ -98,10 +96,7 @@ impl Connector for ClickhouseConnector { state: &Self::State, request: models::QueryRequest, ) -> Result, ExplainError> { - handler::explain(configuration, state, request) - .await - .map(JsonResponse::Value) - .map_err(|err| ExplainError::Other(err.to_string().into())) + handler::explain(configuration, state, request).await } async fn mutation_explain( @@ -129,10 +124,7 @@ impl Connector for ClickhouseConnector { state: &Self::State, request: models::QueryRequest, ) -> Result, QueryError> { - handler::query(configuration, state, request) - .await - .map(JsonResponse::Value) - .map_err(|err| QueryError::Other(err.to_string().into())) + handler::query(configuration, state, request).await } } diff --git a/crates/ndc-clickhouse/src/connector/handler/explain.rs b/crates/ndc-clickhouse/src/connector/handler/explain.rs index a7f8117..860a387 100644 --- a/crates/ndc-clickhouse/src/connector/handler/explain.rs +++ b/crates/ndc-clickhouse/src/connector/handler/explain.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; -use common::{client::execute_query, config::ServerConfig}; -use ndc_sdk::{connector::ExplainError, models}; +use common::{client::execute_json_query, config::ServerConfig}; +use ndc_sdk::{connector::ExplainError, json_response::JsonResponse, models}; use serde::{Deserialize, Serialize}; use crate::{connector::state::ServerState, sql::QueryBuilder}; @@ -15,7 +15,7 @@ pub async fn explain( configuration: &ServerConfig, state: &ServerState, request: models::QueryRequest, -) -> Result { +) -> Result, ExplainError> { let unsafe_statement = QueryBuilder::new(&request, configuration) .build() .map_err(|err| ExplainError::Other(Box::new(err)))?; @@ -31,7 +31,7 @@ pub async fn explain( .await .map_err(|err| ExplainError::Other(err.to_string().into()))?; - let explain = execute_query::( + let explain = execute_json_query::( &client, &configuration.connection, &statement_string, @@ -65,7 +65,7 @@ pub async fn explain( ("Execution Plan".to_string(), explain), ]); - Ok(models::ExplainResponse { details }) + Ok(JsonResponse::Value(models::ExplainResponse { details })) } fn pretty_print_sql(query: &str) -> String { diff --git a/crates/ndc-clickhouse/src/connector/handler/query.rs b/crates/ndc-clickhouse/src/connector/handler/query.rs index f954a7f..5020aa8 100644 --- a/crates/ndc-clickhouse/src/connector/handler/query.rs +++ b/crates/ndc-clickhouse/src/connector/handler/query.rs @@ -1,5 +1,6 @@ use common::{client::execute_query, config::ServerConfig}; -use ndc_sdk::{connector::QueryError, models}; +use ndc_sdk::{connector::QueryError, json_response::JsonResponse, models}; +use tracing::{Instrument, Level}; use crate::{connector::state::ServerState, sql::QueryBuilder}; @@ -7,28 +8,37 @@ pub async fn query( configuration: &ServerConfig, state: &ServerState, request: models::QueryRequest, -) -> Result { - let statement = QueryBuilder::new(&request, configuration) - .build() - .map_err(|err| QueryError::Other(Box::new(err)))?; +) -> Result, QueryError> { + let (statement_string, parameters) = + tracing::info_span!("Build SQL Query").in_scope(|| -> Result<_, QueryError> { + let statement = QueryBuilder::new(&request, configuration) + .build() + .map_err(|err| QueryError::Other(Box::new(err)))?; - let (statement, parameters) = statement.into_parameterized_statement(); + let (statement, parameters) = statement.into_parameterized_statement(); - let statement_string = statement.to_parameterized_sql_string(); + let statement_string = statement.to_parameterized_sql_string(); + + Ok((statement_string, parameters)) + })?; let client = state .client(configuration) .await .map_err(|err| QueryError::Other(err.to_string().into()))?; - let rowsets = execute_query::( + let execution_span = tracing::info_span!("Execute SQL query", "query.SQL" = statement_string); + + let rowsets = execute_query( &client, &configuration.connection, &statement_string, ¶meters, ) + .instrument(execution_span) .await .map_err(|err| QueryError::Other(err.to_string().into()))?; - Ok(models::QueryResponse(rowsets)) + // we assume the response is a valid JSON string, and send those bytes back without parsing + Ok(JsonResponse::Serialized(rowsets)) } diff --git a/crates/ndc-clickhouse/src/connector/handler/schema.rs b/crates/ndc-clickhouse/src/connector/handler/schema.rs index 9fdb524..72578d9 100644 --- a/crates/ndc-clickhouse/src/connector/handler/schema.rs +++ b/crates/ndc-clickhouse/src/connector/handler/schema.rs @@ -7,10 +7,12 @@ use common::{ config::ServerConfig, config_file::{ParameterizedQueryExposedAs, PrimaryKey}, }; -use ndc_sdk::{connector::SchemaError, models}; +use ndc_sdk::{connector::SchemaError, json_response::JsonResponse, models}; use std::collections::BTreeMap; -pub async fn schema(configuration: &ServerConfig) -> Result { +pub async fn schema( + configuration: &ServerConfig, +) -> Result, SchemaError> { let mut scalar_type_definitions = BTreeMap::new(); let mut object_type_definitions = vec![]; @@ -200,7 +202,7 @@ pub async fn schema(configuration: &ServerConfig) -> Result Result QueryBuilder<'r, 'c> { pub fn build(&self) -> Result { self.root_query() } - fn rows_typecast_string( - &self, - fields: &IndexMap, - current_collection: &CollectionContext, - ) -> Result { - Ok(RowsTypeString::new( - current_collection.alias(), - fields, - &self.request.collection_relationships, - self.configuration, - ) - .map_err(|err| QueryBuilderError::Typecasting(err.to_string()))? - .to_string()) - } - fn agregates_typecast_string( - &self, - aggregates: &IndexMap, - current_collection: &CollectionContext, - ) -> Result { - Ok( - AggregatesTypeString::new(current_collection.alias(), aggregates, self.configuration) - .map_err(|err| QueryBuilderError::Typecasting(err.to_string()))? - .to_string(), - ) - } fn root_query(&self) -> Result { let collection = CollectionContext::new(&self.request.collection, &self.request.arguments); let query = &self.request.query; - let get_typecasting_wrapper = |index: usize, alias: &str, typecast_string: String| { - Function::new_unquoted("cast") - .args(vec![ - Function::new_unquoted("tupleElement") - .args(vec![ - Expr::CompoundIdentifier(vec![ - Ident::new_quoted("_rowset"), - Ident::new_quoted("_rowset"), - ]) - .into_arg(), - Expr::Value(Value::Number(index.to_string())).into_arg(), + let select = vec![Function::new_unquoted("toJSONString") + .args(vec![Function::new_unquoted("groupArray") + .args(vec![Function::new_unquoted("cast") + .args(vec![ + Expr::CompoundIdentifier(vec![ + Ident::new_quoted("_rowset"), + Ident::new_quoted("_rowset"), ]) - .into_expr() .into_arg(), - Expr::Value(Value::SingleQuotedString(typecast_string)).into_arg(), - ]) + Expr::Value(Value::SingleQuotedString( + RowsetTypeString::new( + collection.alias(), + query, + &self.request.collection_relationships, + &self.configuration, + ) + .map_err(|err| QueryBuilderError::Typecasting(err.to_string()))? + .to_string(), + )) + .into_arg(), + ]) + .into_expr() + .into_arg()]) .into_expr() - .into_select(Some(alias)) - }; - - let select = match (&self.request.query.fields, &self.request.query.aggregates) { - (None, None) => vec![Expr::Value(Value::Null).into_select::(None)], - (None, Some(aggregates)) => vec![get_typecasting_wrapper( - 1, - "aggregates", - self.agregates_typecast_string(aggregates, &collection)?, - )], - (Some(fields), None) => vec![get_typecasting_wrapper( - 1, - "rows", - self.rows_typecast_string(fields, &collection)?, - )], - (Some(fields), Some(aggregates)) => vec![ - get_typecasting_wrapper(1, "rows", self.rows_typecast_string(fields, &collection)?), - get_typecasting_wrapper( - 2, - "aggregates", - self.agregates_typecast_string(aggregates, &collection)?, - ), - ], - }; + .into_arg()]) + .into_expr() + .into_select(Some("rowsets"))]; let with = if let Some(variables) = &self.request.variables { let mut variable_values: IndexMap> = IndexMap::new(); @@ -205,8 +165,7 @@ impl<'r, 'c> QueryBuilder<'r, 'c> { .select(select) .from(from) .order_by(order_by) - .into_statement() - .format("JSON")) + .into_statement()) } fn rowset_subquery( &self,