Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
cberkhoff committed Jan 10, 2025
1 parent 2889f1c commit 967fdca
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 27 deletions.
5 changes: 5 additions & 0 deletions ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl HelperResponse {
serde_json::from_slice(&self.body)
}

/// Asynchronously collects and returns a newly created `HelperResponse`.
///
/// # Errors
///
/// If the `BytesStream` cannot be collected into a `BytesMut`, an error is returned.
pub async fn from_bytesstream<B: BytesStream>(value: B) -> Result<HelperResponse, BoxError> {
let bytes: bytes::BytesMut = value.try_collect().await?;
Ok(Self {
Expand Down
1 change: 0 additions & 1 deletion ipa-core/src/helpers/transport/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
RoleAssignment, RouteParams,
},
protocol::QueryId,
query::QueryStatus,
};

#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize)]
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/stream/axum_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{error::BoxError, helpers::BytesStream};
pub struct WrappedAxumBodyStream(#[pin] BodyDataStream);

impl WrappedAxumBodyStream {
#[must_use]
pub fn new(b: Body) -> Self {
Self(b.into_data_stream())
}
Expand Down
46 changes: 24 additions & 22 deletions ipa-core/src/net/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
executor::IpaRuntime,
helpers::{
query::{PrepareQuery, QueryConfig, QueryInput},
BodyStream, TransportIdentity, WrappedAxumBodyStream,
BodyStream, TransportIdentity,
},
net::{http_serde, Error, CRYPTO_PROVIDER},
protocol::{Gate, QueryId},
Expand Down Expand Up @@ -385,41 +385,43 @@ impl<F: ConnectionFlavor> IpaHttpClient<F> {
resp_ok(resp).await
}

pub async fn query_status_bytes(&self, query_id: QueryId) -> Result<BodyStream, Error> {
/// Sends a query status request and returns the response bytes.
///
/// # Errors
/// If the request has illegal arguments, or fails to deliver to helper
async fn query_status_impl(&self, query_id: QueryId) -> Result<Bytes, Error> {
let req = http_serde::query::status::Request::new(query_id);
let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?;

let resp = self.request(req).await?;
if resp.status().is_success() {
//let wabs = WrappedAxumBodyStream::new(resp.inner.into_body());
let bytes = response_to_bytes(resp).await?;
let bs = BodyStream::from(bytes.to_vec());
Ok(bs)
Ok(response_to_bytes(resp).await?)
} else {
Err(Error::from_failed_resp(resp).await)
}
}

/// Retrieve the status of a query.
/// Retrieves the status of a query as a byte stream.
///
/// ## Errors
/// This function calls `query_status_impl` and returns the response bytes as a `BodyStream`.
///
/// # Errors
/// If the request has illegal arguments, or fails to deliver to helper
pub async fn query_status_bytes(&self, query_id: QueryId) -> Result<BodyStream, Error> {
let bytes = self.query_status_impl(query_id).await?;
Ok(BodyStream::from(bytes.to_vec()))
}
/// Retrieves the status of a query.
///
/// This function calls `query_status_impl` and deserializes the response bytes into a `QueryStatus` struct.
///
/// # Errors
/// If the request has illegal arguments, or fails to deliver to helper
pub async fn query_status(
&self,
query_id: QueryId,
) -> Result<crate::query::QueryStatus, Error> {
let req = http_serde::query::status::Request::new(query_id);
let req = req.try_into_http_request(self.scheme.clone(), self.authority.clone())?;

let resp = self.request(req).await?;
if resp.status().is_success() {
let bytes = response_to_bytes(resp).await?;
let http_serde::query::status::ResponseBody { status } =
serde_json::from_slice(&bytes)?;
Ok(status)
} else {
Err(Error::from_failed_resp(resp).await)
}
let bytes = self.query_status_impl(query_id).await?;
let http_serde::query::status::ResponseBody { status } = serde_json::from_slice(&bytes)?;
Ok(status)
}
}

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/net/server/handlers/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn s2s_router(transport: Arc<HttpTransport<Shard>>) -> Router {
.merge(step::router(Arc::clone(&transport)))
.merge(prepare::router(Arc::clone(&transport)))
.merge(status::router(Arc::clone(&transport)))
.merge(results::router(Arc::clone(&transport)))
.merge(results::router(transport))
.layer(layer_fn(HelperAuthentication::<_, Shard>::new))
}

Expand Down
5 changes: 2 additions & 3 deletions ipa-core/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::{
};

use ::tokio::sync::oneshot::{error::TryRecvError, Receiver};
use futures::{ready, FutureExt, TryFutureExt, TryStreamExt};
use futures::{ready, FutureExt};
use serde::{Deserialize, Serialize};

use crate::{
error::BoxError,
executor::IpaJoinHandle,
helpers::{query::QueryConfig, BytesStream, HelperResponse, RoleAssignment},
helpers::{query::QueryConfig, RoleAssignment},
protocol::QueryId,
query::runner::QueryResult,
sync::Mutex,
Expand Down

0 comments on commit 967fdca

Please sign in to comment.