Skip to content

Commit

Permalink
remove async_graphql::Request usage
Browse files Browse the repository at this point in the history
  • Loading branch information
meskill committed Dec 20, 2024
1 parent ab557cf commit d16e2e0
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 218 deletions.
197 changes: 81 additions & 116 deletions src/core/async_graphql_hyper.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::any::Any;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};

use anyhow::Result;
use async_graphql::parser::types::{ExecutableDocument, OperationType};
use async_graphql::{BatchResponse, Executor, Value};
use async_graphql::parser::types::ExecutableDocument;
use async_graphql::{BatchResponse, Value};
use async_graphql_value::ConstValue;
use http::header::{HeaderMap, HeaderValue, CACHE_CONTROL, CONTENT_TYPE};
use http::{Response, StatusCode};
use hyper::Body;
Expand All @@ -13,32 +14,17 @@ use tailcall_hasher::TailcallHasher;

use super::jit::{BatchResponse as JITBatchResponse, JITExecutor};

// TODO: replace usage with some other implementation.
// This one is used to calculate hash and use the value later
// as a key in the HashMap. But such use could lead to potential
// issues in case of hash collisions
#[derive(PartialEq, Eq, Clone, Hash, Debug)]
pub struct OperationId(u64);

#[async_trait::async_trait]
pub trait GraphQLRequestLike: Hash + Send {
fn data<D: Any + Clone + Send + Sync>(self, data: D) -> Self;
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
E: Executor;

async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse;

fn parse_query(&mut self) -> Option<&ExecutableDocument>;

fn is_query(&mut self) -> bool {
self.parse_query()
.map(|a| {
let mut is_query = false;
for (_, operation) in a.operations.iter() {
is_query = operation.node.ty == OperationType::Query;
}
is_query
})
.unwrap_or(false)
}

fn operation_id(&self, headers: &HeaderMap) -> OperationId {
let mut hasher = TailcallHasher::default();
let state = &mut hasher;
Expand All @@ -51,86 +37,101 @@ pub trait GraphQLRequestLike: Hash + Send {
}
}

#[derive(Debug, Deserialize)]
pub struct GraphQLBatchRequest(pub async_graphql::BatchRequest);
impl GraphQLBatchRequest {}
impl Hash for GraphQLBatchRequest {
//TODO: Fix Hash implementation for BatchRequest, which should ideally batch
// execution of individual requests instead of the whole chunk of requests as
// one.
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
for request in self.0.iter() {
request.query.hash(state);
request.operation_name.hash(state);
for (name, value) in request.variables.iter() {
name.hash(state);
value.to_string().hash(state);
}
}
}
#[derive(Debug, Hash, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BatchWrapper<T> {
Single(T),
Batch(Vec<T>),
}
#[async_trait::async_trait]
impl GraphQLRequestLike for GraphQLBatchRequest {
fn data<D: Any + Clone + Send + Sync>(mut self, data: D) -> Self {
for request in self.0.iter_mut() {
request.data.insert(data.clone());
}
self
}

async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
GraphQLArcResponse::new(executor.execute_batch(self.0).await)
}

/// Shortcut method to execute the request on the executor.
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
E: Executor,
{
GraphQLResponse(executor.execute_batch(self.0).await)
}
pub type GraphQLBatchRequest = BatchWrapper<GraphQLRequest>;

fn parse_query(&mut self) -> Option<&ExecutableDocument> {
None
#[async_trait::async_trait]
impl GraphQLRequestLike for BatchWrapper<GraphQLRequest> {
async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
GraphQLArcResponse::new(executor.execute_batch(self).await)
}
}

#[derive(Debug, Deserialize)]
pub struct GraphQLRequest(pub async_graphql::Request);
#[derive(Debug, Default, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GraphQLRequest {
#[serde(default)]
pub query: String,
#[serde(default)]
pub operation_name: Option<String>,
#[serde(default)]
pub variables: HashMap<String, ConstValue>,
#[serde(default)]
pub extensions: HashMap<String, ConstValue>,
}

impl GraphQLRequest {}
impl Hash for GraphQLRequest {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.query.hash(state);
self.0.operation_name.hash(state);
for (name, value) in self.0.variables.iter() {
self.query.hash(state);
self.operation_name.hash(state);
for (name, value) in self.variables.iter() {
name.hash(state);
value.to_string().hash(state);
}
}
}

impl GraphQLRequest {
pub fn new(query: impl Into<String>) -> Self {
Self { query: query.into(), ..Default::default() }
}
}

#[async_trait::async_trait]
impl GraphQLRequestLike for GraphQLRequest {
#[must_use]
fn data<D: Any + Send + Sync>(mut self, data: D) -> Self {
self.0.data.insert(data);
self
}
async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
let response = executor.execute(self.0).await;
let response = executor.execute(self).await;
GraphQLArcResponse::new(JITBatchResponse::Single(response))
}
}

#[derive(Debug)]
pub struct ParsedGraphQLRequest {
pub query: String,
pub operation_name: Option<String>,
pub variables: HashMap<String, ConstValue>,
pub extensions: HashMap<String, ConstValue>,
pub parsed_query: ExecutableDocument,
}

impl Hash for ParsedGraphQLRequest {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.query.hash(state);
self.operation_name.hash(state);
for (name, value) in self.variables.iter() {
name.hash(state);
value.to_string().hash(state);
}
}
}

impl TryFrom<GraphQLRequest> for ParsedGraphQLRequest {
type Error = async_graphql::parser::Error;

/// Shortcut method to execute the request on the schema.
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
E: Executor,
{
GraphQLResponse(executor.execute(self.0).await.into())
fn try_from(req: GraphQLRequest) -> std::result::Result<Self, Self::Error> {
let parsed_query = async_graphql::parser::parse_query(&req.query)?;

Ok(Self {
query: req.query,
operation_name: req.operation_name,
variables: req.variables,
extensions: req.extensions,
parsed_query,
})
}
}

fn parse_query(&mut self) -> Option<&ExecutableDocument> {
self.0.parsed_query().ok()
#[async_trait::async_trait]
impl GraphQLRequestLike for ParsedGraphQLRequest {
async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
let response = executor.execute(self).await;
GraphQLArcResponse::new(JITBatchResponse::Single(response))
}
}

Expand All @@ -148,42 +149,6 @@ impl From<async_graphql::Response> for GraphQLResponse {
}
}

impl From<GraphQLQuery> for GraphQLRequest {
fn from(query: GraphQLQuery) -> Self {
let mut request = async_graphql::Request::new(query.query);

if let Some(operation_name) = query.operation_name {
request = request.operation_name(operation_name);
}

if let Some(variables) = query.variables {
let value = serde_json::from_str(&variables).unwrap_or_default();
let variables = async_graphql::Variables::from_json(value);
request = request.variables(variables);
}

GraphQLRequest(request)
}
}

#[derive(Debug)]
pub struct GraphQLQuery {
query: String,
operation_name: Option<String>,
variables: Option<String>,
}

impl GraphQLQuery {
/// Shortcut method to execute the request on the schema.
pub async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
E: Executor,
{
let request: GraphQLRequest = self.into();
request.execute(executor).await
}
}

static APPLICATION_JSON: Lazy<HeaderValue> =
Lazy::new(|| HeaderValue::from_static("application/json"));

Expand Down
3 changes: 1 addition & 2 deletions src/core/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,11 @@ async fn handle_rest_apis(
{ HTTP_ROUTE } = http_route
);
return async {
let mut graphql_request = p_request.into_request(body).await?;
let graphql_request = p_request.into_request(body).await?;
let operation_id = graphql_request.operation_id(&req.headers);
let exec = JITExecutor::new(app_ctx.clone(), req_ctx.clone(), operation_id)
.flatten_response(true);
let mut response = graphql_request
.data(req_ctx.clone())
.execute_with_jit(exec)
.await
.set_cache_control(
Expand Down
4 changes: 2 additions & 2 deletions src/core/jit/exec_const.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use super::context::Context;
use super::exec::{Executor, IRExecutor};
use super::graphql_error::GraphQLError;
use super::{transform, AnyResponse, BuildError, Error, OperationPlan, Request, Response, Result};
use crate::core::app_context::AppContext;
use crate::core::http::RequestContext;
use crate::core::ir::model::IR;
use crate::core::ir::{self, EmptyResolverContext, EvalContext};
use crate::core::jit::synth::Synth;
use crate::core::jit::transform::InputResolver;
use crate::core::json::{JsonLike, JsonLikeList};
use crate::core::json::{JsonLike, JsonLikeList, JsonObjectLike};
use crate::core::Transform;
use crate::core::{app_context::AppContext, json::JsonObjectLike};

/// A specialized executor that executes with async_graphql::Value
#[derive(Setters)]
Expand Down
Loading

1 comment on commit d16e2e0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 4.73ms 2.22ms 33.92ms 77.46%
Req/Sec 5.41k 717.75 8.36k 93.75%

646061 requests in 30.03s, 3.24GB read

Requests/sec: 21515.77

Transfer/sec: 110.43MB

Please sign in to comment.