Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into active-work-per-con…
Browse files Browse the repository at this point in the history
…text
  • Loading branch information
akoshelev committed Sep 27, 2024
2 parents eea8b6a + 59ca3e7 commit bd13038
Show file tree
Hide file tree
Showing 20 changed files with 720 additions and 136 deletions.
4 changes: 4 additions & 0 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl RequestHandler for Inner {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.complete(query_id).await?)
}
RouteId::KillQuery => {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.kill(query_id)?)
}
})
}
}
7 changes: 3 additions & 4 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ impl<I: TransportIdentity> GatewaySenders<I> {
match self.inner.entry(channel_id.clone()) {
Entry::Occupied(entry) => Arc::clone(entry.get()),
Entry::Vacant(entry) => {
let sender = Self::new_sender(
&SendChannelConfig::new::<M>(config, total_records),
channel_id.clone(),
);
let config = SendChannelConfig::new::<M>(config, total_records);
tracing::trace!("send configuration for {channel_id:?}: {config:?}");
let sender = Self::new_sender(&config, channel_id.clone());
entry.insert(Arc::clone(&sender));

tokio::spawn({
Expand Down
11 changes: 10 additions & 1 deletion ipa-core/src/helpers/transport/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
},
query::{
NewQueryError, PrepareQueryError, ProtocolResult, QueryCompletionError, QueryInputError,
QueryStatus, QueryStatusError,
QueryKillStatus, QueryKilled, QueryStatus, QueryStatusError,
},
sync::{Arc, Mutex, Weak},
};
Expand Down Expand Up @@ -135,6 +135,13 @@ impl From<QueryStatus> for HelperResponse {
}
}

impl From<QueryKilled> for HelperResponse {
fn from(value: QueryKilled) -> Self {
let v = serde_json::to_vec(&json!({"query_id": value.0, "status": "killed"})).unwrap();
Self { body: v }
}
}

impl<R: AsRef<dyn ProtocolResult>> From<R> for HelperResponse {
fn from(value: R) -> Self {
let v = value.as_ref().to_bytes();
Expand All @@ -156,6 +163,8 @@ pub enum Error {
#[error(transparent)]
QueryStatus(#[from] QueryStatusError),
#[error(transparent)]
QueryKill(#[from] QueryKillStatus),
#[error(transparent)]
DeserializationFailure(#[from] serde_json::Error),
#[error("MalformedRequest: {0}")]
BadRequest(BoxError),
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ impl<I: TransportIdentity> InMemoryTransport<I> {
| RouteId::PrepareQuery
| RouteId::QueryInput
| RouteId::QueryStatus
| RouteId::CompleteQuery => {
| RouteId::CompleteQuery
| RouteId::KillQuery => {
handler
.as_ref()
.expect("Handler is set")
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/helpers/transport/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum RouteId {
QueryInput,
QueryStatus,
CompleteQuery,
KillQuery,
}

/// The header/metadata of the incoming request.
Expand Down
20 changes: 20 additions & 0 deletions ipa-core/src/helpers/transport/stream/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ impl<I: TransportIdentity, S: Stream> StreamCollection<I, S> {
let mut streams = self.inner.lock().unwrap();
streams.clear();
}

/// Returns the number of streams inside this collection.
///
/// ## Panics
/// if mutex is poisoned.
#[cfg(test)]
#[must_use]
pub fn len(&self) -> usize {
self.inner.lock().unwrap().len()
}

/// Returns `true` if this collection is empty.
///
/// ## Panics
/// if mutex is poisoned.
#[must_use]
#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// Describes the lifecycle of records stream inside [`StreamCollection`]
Expand Down
78 changes: 78 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,82 @@ pub mod query {

pub const AXUM_PATH: &str = "/:query_id/complete";
}

pub mod kill {
use serde::{Deserialize, Serialize};

use crate::{
helpers::{routing::RouteId, HelperResponse, NoStep, RouteParams},
protocol::QueryId,
};

pub struct Request {
pub query_id: QueryId,
}

impl RouteParams<RouteId, QueryId, NoStep> for Request {
type Params = String;

fn resource_identifier(&self) -> RouteId {
RouteId::KillQuery
}

fn query_id(&self) -> QueryId {
self.query_id
}

fn gate(&self) -> NoStep {
NoStep
}

fn extra(&self) -> Self::Params {
String::new()
}
}

impl Request {
/// Currently, it is only possible to kill
/// a query by issuing an HTTP request manually.
/// Maybe report collector can support this API,
/// but for now, only tests exercise this path
/// hence methods here are hidden behind feature
/// flags
#[cfg(all(test, unit_test))]
pub fn new(query_id: QueryId) -> Self {
Self { query_id }
}

#[cfg(all(test, unit_test))]
pub fn try_into_http_request(
self,
scheme: axum::http::uri::Scheme,
authority: axum::http::uri::Authority,
) -> crate::net::http_serde::OutgoingRequest {
let uri = axum::http::uri::Uri::builder()
.scheme(scheme)
.authority(authority)
.path_and_query(format!(
"{}/{}/kill",
crate::net::http_serde::query::BASE_AXUM_PATH,
self.query_id.as_ref()
))
.build()?;
Ok(hyper::Request::post(uri).body(axum::body::Body::empty())?)
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResponseBody {
pub query_id: QueryId,
pub status: String,
}

impl From<HelperResponse> for ResponseBody {
fn from(value: HelperResponse) -> Self {
serde_json::from_slice(value.into_body().as_slice()).unwrap()
}
}

pub const AXUM_PATH: &str = "/:query_id/kill";
}
}
136 changes: 136 additions & 0 deletions ipa-core/src/net/server/handlers/query/kill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use axum::{extract::Path, routing::post, Extension, Json, Router};
use hyper::StatusCode;

use crate::{
helpers::{ApiError, BodyStream, Transport},
net::{
http_serde::query::{kill, kill::Request},
server::Error,
Error::QueryIdNotFound,
HttpTransport,
},
protocol::QueryId,
query::QueryKillStatus,
sync::Arc,
};

async fn handler(
transport: Extension<Arc<HttpTransport>>,
Path(query_id): Path<QueryId>,
) -> Result<Json<kill::ResponseBody>, Error> {
let req = Request { query_id };
let transport = Transport::clone_ref(&*transport);
match transport.dispatch(req, BodyStream::empty()).await {
Ok(state) => Ok(Json(kill::ResponseBody::from(state))),
Err(ApiError::QueryKill(QueryKillStatus::NoSuchQuery(query_id))) => Err(
Error::application(StatusCode::NOT_FOUND, QueryIdNotFound(query_id)),
),
Err(e) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

pub fn router(transport: Arc<HttpTransport>) -> Router {
Router::new()
.route(kill::AXUM_PATH, post(handler))
.layer(Extension(transport))
}

#[cfg(all(test, unit_test))]
mod tests {
use axum::{
body::Body,
http::uri::{Authority, Scheme},
};
use hyper::StatusCode;

use crate::{
helpers::{
make_owned_handler,
routing::{Addr, RouteId},
ApiError, BodyStream, HelperIdentity, HelperResponse,
},
net::{
http_serde,
server::handlers::query::test_helpers::{
assert_fails_with, assert_fails_with_handler, assert_success_with,
},
},
protocol::QueryId,
query::{QueryKillStatus, QueryKilled},
};

#[tokio::test]
async fn calls_kill() {
let expected_query_id = QueryId;

let handler = make_owned_handler(
move |addr: Addr<HelperIdentity>, _data: BodyStream| async move {
let RouteId::KillQuery = addr.route else {
panic!("unexpected call: {addr:?}");
};
assert_eq!(addr.query_id, Some(expected_query_id));
Ok(HelperResponse::from(QueryKilled(expected_query_id)))
},
);

let req = http_serde::query::kill::Request::new(QueryId);
let req = req
.try_into_http_request(Scheme::HTTP, Authority::from_static("localhost"))
.unwrap();
assert_success_with(req, handler).await;
}

#[tokio::test]
async fn no_such_query() {
let handler = make_owned_handler(
move |_addr: Addr<HelperIdentity>, _data: BodyStream| async move {
Err(QueryKillStatus::NoSuchQuery(QueryId).into())
},
);

let req = http_serde::query::kill::Request::new(QueryId)
.try_into_http_request(Scheme::HTTP, Authority::from_static("localhost"))
.unwrap();
assert_fails_with_handler(req, handler, StatusCode::NOT_FOUND).await;
}

#[tokio::test]
async fn unknown_error() {
let handler = make_owned_handler(
move |_addr: Addr<HelperIdentity>, _data: BodyStream| async move {
Err(ApiError::DeserializationFailure(
serde_json::from_str::<()>("not-a-json").unwrap_err(),
))
},
);

let req = http_serde::query::kill::Request::new(QueryId)
.try_into_http_request(Scheme::HTTP, Authority::from_static("localhost"))
.unwrap();
assert_fails_with_handler(req, handler, StatusCode::INTERNAL_SERVER_ERROR).await;
}

struct OverrideReq {
query_id: String,
}

impl From<OverrideReq> for hyper::Request<Body> {
fn from(val: OverrideReq) -> Self {
let uri = format!(
"http://localhost{}/{}/kill",
http_serde::query::BASE_AXUM_PATH,
val.query_id
);
hyper::Request::post(uri).body(Body::empty()).unwrap()
}
}

#[tokio::test]
async fn malformed_query_id() {
let req = OverrideReq {
query_id: "not-a-query-id".into(),
};

assert_fails_with(req.into(), StatusCode::BAD_REQUEST).await;
}
}
15 changes: 15 additions & 0 deletions ipa-core/src/net/server/handlers/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod create;
mod input;
mod kill;
mod prepare;
mod results;
mod status;
Expand Down Expand Up @@ -31,6 +32,7 @@ pub fn query_router(transport: Arc<HttpTransport>) -> Router {
.merge(create::router(Arc::clone(&transport)))
.merge(input::router(Arc::clone(&transport)))
.merge(status::router(Arc::clone(&transport)))
.merge(kill::router(Arc::clone(&transport)))
.merge(results::router(transport))
}

Expand Down Expand Up @@ -139,6 +141,19 @@ pub mod test_helpers {
assert_eq!(resp.status(), expected_status);
}

pub async fn assert_fails_with_handler(
req: hyper::Request<Body>,
handler: Arc<dyn RequestHandler<Identity = HelperIdentity>>,
expected_status: StatusCode,
) {
let test_server = TestServer::builder()
.with_request_handler(handler)
.build()
.await;
let resp = test_server.server.handle_req(req).await;
assert_eq!(resp.status(), expected_status);
}

pub async fn assert_success_with(
req: hyper::Request<Body>,
handler: Arc<dyn RequestHandler<Identity = HelperIdentity>>,
Expand Down
Loading

0 comments on commit bd13038

Please sign in to comment.