Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom future type, ResponseFuture, for HTTP responses #1834

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eng/dict/rust-custom.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
bindgen
impl
impls
newtype
repr
rustc
Expand Down
4 changes: 2 additions & 2 deletions eng/test/mock_transport/src/mock_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl From<MockResponse> for Response {
fn from(mock_response: MockResponse) -> Self {
let bytes_stream: azure_core::BytesStream = mock_response.body.into();

Self::new(
Self::from_stream(
mock_response.status,
mock_response.headers,
Box::pin(bytes_stream),
Expand All @@ -46,7 +46,7 @@ impl MockResponse {
"an error occurred fetching the next part of the byte stream",
)?;

let response = Response::new(
let response = Response::from_stream(
status_code,
header_map.clone(),
Box::pin(BytesStream::new(response_bytes.clone())),
Expand Down
1 change: 0 additions & 1 deletion sdk/core/azure_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,4 @@ features = [
"reqwest_rustls",
"hmac_rust",
"hmac_openssl",
"xml",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just duplicated, you can see it further up if you expand the context

]
12 changes: 11 additions & 1 deletion sdk/core/azure_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
#![deny(missing_debug_implementations, nonstandard_style)]
// #![warn(missing_docs, future_incompatible, unreachable_pub)]

// Docs.rs build is done with the nightly compiler, so we can enable nightly features in that build.
// In this case we enable two features:
// - `doc_auto_cfg`: Automatically scans `cfg` attributes and uses them to show those required configurations in the generated documentation.
// - `doc_cfg_hide`: Ignore the `doc` configuration for `doc_auto_cfg`.
// See https://doc.rust-lang.org/rustdoc/unstable-features.html#doc_auto_cfg-automatically-generate-doccfg for more details.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(docsrs, feature(doc_cfg_hide))]

#[macro_use]
mod macros;

Expand All @@ -38,7 +46,9 @@ pub use models::*;
pub use options::*;
pub use pipeline::*;
pub use policies::*;
pub use typespec_client_core::http::response::{Model, PinnedStream, Response, ResponseBody};
pub use typespec_client_core::http::{
LazyResponse, PinnedStream, Response, ResponseBody, ResponseFuture,
};

// Re-export typespec types that are not specific to Azure.
pub use typespec::{Error, Result};
Expand Down
12 changes: 4 additions & 8 deletions sdk/cosmos/azure_data_cosmos/examples/cosmos_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,12 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_client = client.database_client(&args.database);
if let Some(container_name) = args.container {
let container_client = db_client.container_client(container_name);
let response = container_client
.read(None)
.await?
.deserialize_body()
.await?;
println!("{:?}", response);
let response = container_client.read(None).await?;
println!("{:?}", response.into_body());
return Ok(());
} else {
let response = db_client.read(None).await?.deserialize_body().await?;
println!("{:?}", response);
let response = db_client.read(None).await?;
println!("{:?}", response.into_body());
}
Ok(())
}
Expand Down
32 changes: 9 additions & 23 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ pub trait ContainerClientMethods {
/// # async fn doc() {
/// # use azure_data_cosmos::clients::{ContainerClient, ContainerClientMethods};
/// # let container_client: ContainerClient = panic!("this is a non-running example");
/// let response = container_client.read(None)
/// .await.unwrap()
/// .deserialize_body()
/// .await.unwrap();
/// let response = container_client.read(None).await.unwrap();
/// # }
/// ```
#[allow(async_fn_in_trait)] // REASON: See https://github.com/Azure/azure-sdk-for-rust/issues/1796 for detailed justification
async fn read(
fn read(
&self,
options: Option<ReadContainerOptions>,
) -> azure_core::Result<azure_core::Response<ContainerProperties>>;
) -> azure_core::ResponseFuture<ContainerProperties>;

/// Executes a single-partition query against items in the container.
///
Expand Down Expand Up @@ -129,17 +126,16 @@ impl ContainerClient {
}

impl ContainerClientMethods for ContainerClient {
async fn read(
fn read(
&self,

#[allow(unused_variables)]
// This is a documented public API so prefixing with '_' is undesirable.
options: Option<ReadContainerOptions>,
) -> azure_core::Result<azure_core::Response<ContainerProperties>> {
let mut req = Request::new(self.container_url.clone(), azure_core::Method::Get);
) -> azure_core::ResponseFuture<ContainerProperties> {
let req = Request::new(self.container_url.clone(), azure_core::Method::Get);
self.pipeline
.send(Context::new(), &mut req, ResourceType::Containers)
.await
.send(Context::new(), req, ResourceType::Containers)
}

fn query_items<T: DeserializeOwned + Send>(
Expand All @@ -159,16 +155,6 @@ impl ContainerClientMethods for ContainerClient {
documents: Vec<M>,
}

// We have to manually implement Model, because the derive macro doesn't support auto-inferring type and lifetime bounds.
// See https://github.com/Azure/azure-sdk-for-rust/issues/1803
impl<M: DeserializeOwned> azure_core::Model for QueryResponseModel<M> {
async fn from_response_body(
body: azure_core::ResponseBody,
) -> typespec_client_core::Result<Self> {
body.json().await
}
}

let mut url = self.container_url.clone();
url.append_path_segments(["docs"]);
let mut base_req = Request::new(url, azure_core::Method::Post);
Expand Down Expand Up @@ -196,7 +182,7 @@ impl ContainerClientMethods for ContainerClient {
}

let resp = pipeline
.send(Context::new(), &mut req, ResourceType::Items)
.send(Context::new(), req, ResourceType::Items)
.await?;

let query_metrics = resp
Expand All @@ -208,7 +194,7 @@ impl ContainerClientMethods for ContainerClient {
let continuation_token =
resp.headers().get_optional_string(&constants::CONTINUATION);

let query_response: QueryResponseModel<T> = resp.deserialize_body().await?;
let query_response: QueryResponseModel<T> = resp.into_body();

let query_results = QueryResults {
items: query_response.documents,
Expand Down
18 changes: 7 additions & 11 deletions sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ pub trait DatabaseClientMethods {
/// # async fn doc() {
/// # use azure_data_cosmos::clients::{DatabaseClient, DatabaseClientMethods};
/// # let database_client: DatabaseClient = panic!("this is a non-running example");
/// let response = database_client.read(None)
/// .await.unwrap()
/// .deserialize_body()
/// .await.unwrap();
/// let response = database_client.read(None).await.unwrap();
/// # }
/// ```
#[allow(async_fn_in_trait)] // REASON: See https://github.com/Azure/azure-sdk-for-rust/issues/1796 for detailed justification
async fn read(
fn read(
&self,
options: Option<ReadDatabaseOptions>,
) -> azure_core::Result<azure_core::Response<DatabaseProperties>>;
) -> azure_core::ResponseFuture<DatabaseProperties>;

/// Gets a [`ContainerClient`] that can be used to access the collection with the specified name.
///
Expand Down Expand Up @@ -70,17 +67,16 @@ impl DatabaseClient {
}

impl DatabaseClientMethods for DatabaseClient {
async fn read(
fn read(
&self,

#[allow(unused_variables)]
// This is a documented public API so prefixing with '_' is undesirable.
options: Option<ReadDatabaseOptions>,
) -> azure_core::Result<azure_core::Response<DatabaseProperties>> {
let mut req = Request::new(self.database_url.clone(), azure_core::Method::Get);
) -> azure_core::ResponseFuture<DatabaseProperties> {
let req = Request::new(self.database_url.clone(), azure_core::Method::Get);
self.pipeline
.send(Context::new(), &mut req, ResourceType::Databases)
.await
.send(Context::new(), req, ResourceType::Databases)
}

fn container_client(&self, name: impl AsRef<str>) -> ContainerClient {
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/azure_data_cosmos/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use azure_core::{
date::{ComponentRange, OffsetDateTime},
Continuable, Model,
Continuable,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -72,7 +72,7 @@ pub struct SystemProperties {
/// Properties of a Cosmos DB database.
///
/// Returned by [`DatabaseClient::read()`](crate::clients::DatabaseClient::read()).
#[derive(Model, Debug, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct DatabaseProperties {
/// The ID of the database.
pub id: String,
Expand All @@ -85,7 +85,7 @@ pub struct DatabaseProperties {
/// Properties of a Cosmos DB container.
///
/// Returned by [`ContainerClient::read()`](crate::clients::ContainerClient::read()).
#[derive(Model, Debug, Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ContainerProperties {
/// The ID of the container.
pub id: String,
Expand Down
14 changes: 8 additions & 6 deletions sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod authorization_policy;
use std::sync::Arc;

pub(crate) use authorization_policy::{AuthorizationPolicy, ResourceType};
use serde::de::DeserializeOwned;

/// Newtype that wraps an Azure Core pipeline to provide a Cosmos-specific pipeline which configures our authorization policy and enforces that a [`ResourceType`] is set on the context.
#[derive(Debug, Clone)]
Expand All @@ -25,13 +26,14 @@ impl CosmosPipeline {
))
}

pub async fn send<T>(
&self,
ctx: azure_core::Context<'_>,
request: &mut azure_core::Request,
pub fn send<'a, T: DeserializeOwned>(
&'a self,
ctx: azure_core::Context<'a>,
request: azure_core::Request,
resource_type: ResourceType,
) -> azure_core::Result<azure_core::Response<T>> {
) -> azure_core::ResponseFuture<'a, T> {
// We know all our APIs use JSON, so we can just create a wrapper that calls '.json' for us.
let ctx = ctx.with_value(resource_type);
self.0.send(&ctx, request).await
self.0.send(ctx, request).json()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use openssl::{
use serde::Deserialize;
use std::{str, sync::Arc, time::Duration};
use time::OffsetDateTime;
use typespec_client_core::http::Model;
use url::form_urlencoded;

/// Refresh time to use in seconds.
Expand Down Expand Up @@ -255,7 +254,7 @@ impl ClientCertificateCredential {
return Err(http_response_from_body(rsp_status, &rsp_body).into_error());
}

let response: AadTokenResponse = rsp.deserialize_body_into().await?;
let response: AadTokenResponse = rsp.into_body().json().await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Azure Identity doesn't use the full pipeline, it calls the HTTP client itself. It's always used the "Raw Response" and manually deserialized.

Ok(AccessToken::new(
response.access_token,
OffsetDateTime::now_utc() + Duration::from_secs(response.expires_in),
Expand Down Expand Up @@ -326,7 +325,7 @@ impl ClientCertificateCredential {
}
}

#[derive(Model, Deserialize, Debug, Default)]
#[derive(Deserialize, Debug, Default)]
#[serde(default)]
struct AadTokenResponse {
token_type: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub async fn authorize(
let rsp_status = rsp.status();
debug!("rsp_status == {:?}", rsp_status);
if rsp_status.is_success() {
rsp.deserialize_body_into().await
rsp.into_body().json().await
} else {
let rsp_body = rsp.into_body().collect().await?;
let text = std::str::from_utf8(&rsp_body)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use azure_core::credentials::Secret;
use serde::{Deserialize, Deserializer};
use time::OffsetDateTime;
use typespec_client_core::Model;

#[derive(Debug, Clone, Deserialize)]
struct RawLoginResponse {
Expand All @@ -19,7 +18,7 @@ struct RawLoginResponse {
access_token: String,
}

#[derive(Model, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct LoginResponse {
pub token_type: String,
pub expires_in: u64,
Expand Down
7 changes: 2 additions & 5 deletions sdk/identity/azure_identity/src/refresh_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use azure_core::{
use serde::Deserialize;
use std::fmt;
use std::sync::Arc;
use typespec_client_core::Model;
use url::form_urlencoded;

/// Exchange a refresh token for a new access token and refresh token.
Expand Down Expand Up @@ -54,9 +53,7 @@ pub async fn exchange(
let rsp_status = rsp.status();

if rsp_status.is_success() {
rsp.deserialize_body_into()
.await
.map_kind(ErrorKind::Credential)
rsp.into_body().json().await.map_kind(ErrorKind::Credential)
} else {
let rsp_body = rsp.into_body().collect().await?;
let token_error: RefreshTokenError =
Expand All @@ -67,7 +64,7 @@ pub async fn exchange(

/// A refresh token
#[allow(dead_code)]
#[derive(Model, Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct RefreshTokenResponse {
token_type: String,
#[serde(rename = "scope", deserialize_with = "deserialize::split")]
Expand Down
1 change: 0 additions & 1 deletion sdk/typespec/typespec_client_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ tokio = { workspace = true, features = ["macros", "rt", "time"] }
[dev-dependencies]
once_cell.workspace = true
tokio.workspace = true
typespec_derive.workspace = true

[features]
default = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl HttpError {
/// Create an error from an HTTP response.
///
/// This does not check whether the response was successful and should only be used with unsuccessful responses.
pub async fn new(response: Response<()>) -> Self {
pub async fn new(response: Response) -> Self {
let status = response.status();
let headers: HashMap<String, String> = response
.headers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use crate::http::{
headers::{HeaderName, HeaderValue, Headers},
request::{Body, Request},
response::PinnedStream,
HttpClient, Method, Response, StatusCode,
HttpClient, Method, PinnedStream, Response, StatusCode,
};
use async_trait::async_trait;
use futures::TryStreamExt;
Expand Down Expand Up @@ -77,7 +76,11 @@ impl HttpClient for ::reqwest::Client {
)
}));

Ok(Response::new(try_from_status(status)?, headers, body))
Ok(Response::from_stream(
try_from_status(status)?,
headers,
body,
))
}
}

Expand Down
Loading
Loading