Skip to content

Commit

Permalink
feat: enable POST requests batching with dl. (#3140)
Browse files Browse the repository at this point in the history
Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
laststylebender14 and tusharmath authored Dec 10, 2024
1 parent 626b142 commit f067629
Show file tree
Hide file tree
Showing 42 changed files with 1,622 additions and 178 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ default = ["cli", "js"]
# Feature flag to force JIT engine inside integration tests
force_jit = []


[workspace]
members = [
".",
Expand Down
12 changes: 6 additions & 6 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ directive @http(
batchKey: [String!]
"""
The body of the API call. It's used for methods like POST or PUT that send data to
the server. You can pass it as a static object or use a Mustache template to substitute
variables from the GraphQL variables.
the server. You can pass it as a static object or use a Mustache template with object
to substitute variables from the GraphQL variables.
"""
body: String
body: JSON
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
Expand Down Expand Up @@ -918,10 +918,10 @@ input Http {
batchKey: [String!]
"""
The body of the API call. It's used for methods like POST or PUT that send data to
the server. You can pass it as a static object or use a Mustache template to substitute
variables from the GraphQL variables.
the server. You can pass it as a static object or use a Mustache template with object
to substitute variables from the GraphQL variables.
"""
body: String
body: JSON
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
Expand Down
10 changes: 8 additions & 2 deletions src/core/blueprint/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ pub enum BlueprintError {
#[error("Protobuf files were not specified in the config")]
ProtobufFilesNotSpecifiedInConfig,

#[error("GroupBy is only supported for GET requests")]
GroupByOnlyForGet,
#[error("GroupBy is only supported for GET and POST requests")]
GroupByOnlyForGetAndPost,

#[error("Request body batching requires exactly one dynamic value in the body.")]
BatchRequiresDynamicParameter,

#[error("Batching capability was used without enabling it in upstream")]
IncorrectBatchingUsage,

#[error("batchKey requires either body or query parameters")]
BatchKeyRequiresEitherBodyOrQuery,

#[error("script is required")]
ScriptIsRequired,

Expand Down
107 changes: 92 additions & 15 deletions src/core/blueprint/operators/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ pub fn compile_http(
Err(e) => Valid::from_validation_err(BlueprintError::from_validation_string(e)),
};

Valid::<(), BlueprintError>::fail(BlueprintError::GroupByOnlyForGet)
.when(|| !http.batch_key.is_empty() && http.method != Method::GET)
.and(
Valid::<(), BlueprintError>::fail(BlueprintError::IncorrectBatchingUsage).when(|| {
(config_module.upstream.get_delay() < 1
|| config_module.upstream.get_max_size() < 1)
&& !http.batch_key.is_empty()
}),
)
Valid::<(), BlueprintError>::fail(BlueprintError::IncorrectBatchingUsage)
.when(|| {
(config_module.upstream.get_delay() < 1 || config_module.upstream.get_max_size() < 1)
&& !http.batch_key.is_empty()
})
.and(
Valid::from_iter(http.query.iter(), |query| {
validate_argument(config_module, Mustache::parse(query.value.as_str()), field)
})
.unit()
.trace("query"),
)
.and(
Valid::<(), BlueprintError>::fail(BlueprintError::BatchKeyRequiresEitherBodyOrQuery)
.when(|| {
!http.batch_key.is_empty() && (http.body.is_none() && http.query.is_empty())
}),
)
.and(Valid::succeed(http.url.as_str()))
.zip(mustache_headers)
.and_then(|(base_url, headers)| {
Expand Down Expand Up @@ -67,6 +69,22 @@ pub fn compile_http(
Err(e) => Valid::fail(BlueprintError::Error(e)),
}
})
.and_then(|request_template| {
if !http.batch_key.is_empty() && (http.body.is_some() || http.method != Method::GET) {
if let Some(body) = http.body.as_ref() {
let dynamic_paths = count_dynamic_paths(body);
if dynamic_paths != 1 {
Valid::fail(BlueprintError::BatchRequiresDynamicParameter).trace("body")
} else {
Valid::succeed(request_template)
}
} else {
Valid::fail(BlueprintError::BatchRequiresDynamicParameter).trace("body")
}
} else {
Valid::succeed(request_template)
}
})
.map(|req_template| {
// marge http and upstream on_request
let on_request = http
Expand All @@ -76,13 +94,18 @@ pub fn compile_http(
let on_response_body = http.on_response_body.clone();
let hook = WorkerHooks::try_new(on_request, on_response_body).ok();

let io = if !http.batch_key.is_empty() && http.method == Method::GET {
let io = if !http.batch_key.is_empty() {
// Find a query parameter that contains a reference to the {{.value}} key
let key = http.query.iter().find_map(|q| {
Mustache::parse(&q.value)
.expression_contains("value")
.then(|| q.key.clone())
});
let key = if http.method == Method::GET {
http.query.iter().find_map(|q| {
Mustache::parse(&q.value)
.expression_contains("value")
.then(|| q.key.clone())
})
} else {
None
};

IR::IO(IO::Http {
req_template,
group_by: Some(GroupBy::new(http.batch_key.clone(), key)),
Expand All @@ -105,3 +128,57 @@ pub fn compile_http(
})
.and_then(apply_select)
}

/// Count the number of dynamic expressions in the JSON value.
fn count_dynamic_paths(json: &serde_json::Value) -> usize {
let mut count = 0;
match json {
serde_json::Value::Array(arr) => {
for v in arr {
count += count_dynamic_paths(v)
}
}
serde_json::Value::Object(obj) => {
for (_, v) in obj {
count += count_dynamic_paths(v)
}
}
serde_json::Value::String(s) => {
if !Mustache::parse(s).is_const() {
count += 1;
}
}
_ => {}
}
count
}

#[cfg(test)]
mod test {
use serde_json::json;

use super::*;

#[test]
fn test_extract_expression_keys_from_nested_objects() {
let json = r#"{"body":"d","userId":"{{.value.uid}}","nested":{"other":"{{test}}"}}"#;
let json = serde_json::from_str(json).unwrap();
let keys = count_dynamic_paths(&json);
assert_eq!(keys, 2);
}

#[test]
fn test_extract_expression_keys_from_mixed_json() {
let json = r#"{"body":"d","userId":"{{.value.uid}}","nested":{"other":"{{test}}"},"meta":[{"key": "id", "value": "{{.value.userId}}"}]}"#;
let json = serde_json::from_str(json).unwrap();
let keys = count_dynamic_paths(&json);
assert_eq!(keys, 3);
}

#[test]
fn test_with_non_json_value() {
let json = json!(r#"{{.value}}"#);
let keys = count_dynamic_paths(&json);
assert_eq!(keys, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Index {
),
headers: {},
body: Some(
"{{.args.input}}",
String("{{.args.input}}"),
),
description: None,
encoding: ApplicationJson,
Expand Down Expand Up @@ -127,7 +127,7 @@ Index {
),
headers: {},
body: Some(
"{{.args.input}}",
String("{{.args.input}}"),
),
description: None,
encoding: ApplicationJson,
Expand Down Expand Up @@ -205,7 +205,7 @@ Index {
),
headers: {},
body: Some(
"{{.args.input}}",
String("{{.args.input}}"),
),
description: None,
encoding: ApplicationJson,
Expand Down Expand Up @@ -286,7 +286,7 @@ Index {
),
headers: {},
body: Some(
"{{.args.input}}",
String("{{.args.input}}"),
),
description: None,
encoding: ApplicationJson,
Expand Down
5 changes: 3 additions & 2 deletions src/core/config/directives/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub struct Http {
#[serde(default, skip_serializing_if = "is_default")]
/// The body of the API call. It's used for methods like POST or PUT that
/// send data to the server. You can pass it as a static object or use a
/// Mustache template to substitute variables from the GraphQL variables.
pub body: Option<String>,
/// Mustache template with object to substitute variables from the GraphQL
/// variables.
pub body: Option<Value>,

#[serde(default, skip_serializing_if = "is_default")]
/// The `encoding` parameter specifies the encoding of the request body. It
Expand Down
10 changes: 6 additions & 4 deletions src/core/config/transformer/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl KeysExtractor {
Valid::from_iter(
[
Self::parse_str(http.url.as_str()).trace("url"),
Self::parse_str_option(http.body.as_deref()).trace("body"),
Self::parse_json_option(http.body.as_ref()).trace("body"),
Self::parse_key_value_iter(http.headers.iter()).trace("headers"),
Self::parse_key_value_iter(http.query.iter().map(|q| KeyValue {
key: q.key.to_string(),
Expand Down Expand Up @@ -355,9 +355,9 @@ impl KeysExtractor {
.map_to(keys)
}

fn parse_str_option(s: Option<&str>) -> Valid<Keys, String> {
fn parse_json_option(s: Option<&serde_json::Value>) -> Valid<Keys, String> {
if let Some(s) = s {
Self::parse_str(s)
Self::parse_str(&s.to_string())
} else {
Valid::succeed(Keys::new())
}
Expand Down Expand Up @@ -483,7 +483,9 @@ mod tests {
fn test_extract_http() {
let http = Http {
url: "http://tailcall.run/users/{{.value.id}}".to_string(),
body: Some(r#"{ "obj": "{{.value.obj}}"} "#.to_string()),
body: Some(serde_json::Value::String(
r#"{ "obj": "{{.value.obj}}"} "#.to_string(),
)),
headers: vec![KeyValue {
key: "{{.value.header.key}}".to_string(),
value: "{{.value.header.value}}".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/core/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Endpoint {
pub input: JsonSchema,
pub output: JsonSchema,
pub headers: HeaderMap,
pub body: Option<String>,
pub body: Option<serde_json::Value>,
pub description: Option<String>,
pub encoding: Encoding,
}
Expand Down
5 changes: 4 additions & 1 deletion src/core/generator/json/operation_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ impl OperationTypeGenerator {
let arg_name_gen = NameGenerator::new(prefix.as_str());
let arg_name = arg_name_gen.next();

http_resolver.body = Some(format!("{{{{.args.{}}}}}", arg_name));
http_resolver.body = Some(serde_json::Value::String(format!(
"{{{{.args.{}}}}}",
arg_name
)));
http_resolver.method = request_sample.method.to_owned();

field.args.insert(
Expand Down
9 changes: 5 additions & 4 deletions src/core/generator/proto/connect_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl From<Grpc> for Http {

Self {
url: new_url,
body: body.map(|b| b.to_string()),
body,
method: crate::core::http::Method::POST,
headers,
batch_key,
Expand Down Expand Up @@ -91,7 +91,7 @@ mod tests {

assert_eq!(http.url, "http://localhost:8080/package.service/method");
assert_eq!(http.method, crate::core::http::Method::POST);
assert_eq!(http.body, Some(r#"{"key":"value"}"#.to_string()));
assert_eq!(http.body, Some(json!({"key": "value"})));
}

#[test]
Expand All @@ -109,7 +109,7 @@ mod tests {

let http = Http::from(grpc);

assert_eq!(http.body, Some("{}".to_string()));
assert_eq!(http.body, Some(json!({})));
}

#[test]
Expand All @@ -136,6 +136,7 @@ mod tests {
.value,
"bar".to_string()
);
assert_eq!(http.body, Some(json!({})));
}

#[test]
Expand All @@ -155,7 +156,7 @@ mod tests {

assert_eq!(http.url, "http://localhost:8080/package.service/method");
assert_eq!(http.method, crate::core::http::Method::POST);
assert_eq!(http.body, Some(r#"{"key":"value"}"#.to_string()));
assert_eq!(http.body, Some(json!({"key": "value"})));
assert_eq!(
http.headers
.iter()
Expand Down
Loading

1 comment on commit f067629

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 403.26us 324.92us 19.54ms 94.39%
Req/Sec 58.58k 1.98k 64.00k 71.08%

6994930 requests in 30.00s, 35.06GB read

Requests/sec: 233155.43

Transfer/sec: 1.17GB

Please sign in to comment.