From 239a5d88419462669c475956c457157651dcd1bc Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 3 Feb 2024 15:19:38 +0000 Subject: [PATCH] Improve smart_git performance via output streaming --- src/methods/repo/mod.rs | 2 - src/methods/repo/smart_git.rs | 136 ++++++++++++++++++++++++++-------- 2 files changed, 106 insertions(+), 32 deletions(-) diff --git a/src/methods/repo/mod.rs b/src/methods/repo/mod.rs index 6280480..257b3df 100644 --- a/src/methods/repo/mod.rs +++ b/src/methods/repo/mod.rs @@ -77,8 +77,6 @@ where let mut service = match uri_parts.pop() { Some("about") => h!(handle_about), - // TODO: https://man.archlinux.org/man/git-http-backend.1.en - // TODO: GIT_PROTOCOL Some("refs") if uri_parts.last() == Some(&"info") => { uri_parts.pop(); h!(handle_smart_git) diff --git a/src/methods/repo/smart_git.rs b/src/methods/repo/smart_git.rs index 7d4463c..de98b30 100644 --- a/src/methods/repo/smart_git.rs +++ b/src/methods/repo/smart_git.rs @@ -1,19 +1,25 @@ -use std::{io::ErrorKind, path::Path, process::Stdio, str::FromStr}; +use std::{io, io::ErrorKind, path::Path, process::Stdio, str::FromStr}; -use anyhow::{bail, Context}; +use anyhow::{anyhow, Context}; use axum::{ - body::{boxed, Body}, + body::StreamBody, extract::BodyStream, headers::{HeaderMap, HeaderName, HeaderValue}, http::{Method, Uri}, - response::Response, + response::{IntoResponse, Response}, Extension, }; +use bytes::{Buf, Bytes, BytesMut}; use futures::TryStreamExt; use httparse::Status; -use tokio::process::Command; +use tokio::{ + io::AsyncReadExt, + process::{Child, ChildStderr, ChildStdout, Command}, + sync::mpsc, +}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::StreamReader; -use tracing::warn; +use tracing::{debug, error, info_span, warn, Instrument}; use crate::{ methods::repo::{Repository, RepositoryPath, Result}, @@ -28,7 +34,7 @@ pub async fn handle( uri: Uri, headers: HeaderMap, body: BodyStream, -) -> Result { +) -> Result { let path = extract_path(&uri, &repository)?; let mut command = Command::new("git"); @@ -51,35 +57,104 @@ pub async fn handle( .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) + .kill_on_drop(true) .spawn() .context("Failed to spawn git http-backend")?; - { - let mut body = - StreamReader::new(body.map_err(|e| std::io::Error::new(ErrorKind::Other, e))); - let mut stdin = child.stdin.take().context("Stdin already taken")?; + let mut stdout = child.stdout.take().context("Stdout already taken")?; + let mut stderr = child.stderr.take().context("Stderr already taken")?; + let mut stdin = child.stdin.take().context("Stdin already taken")?; + + // read request body and forward to stdin + let mut body = StreamReader::new(body.map_err(|e| std::io::Error::new(ErrorKind::Other, e))); + tokio::io::copy_buf(&mut body, &mut stdin) + .await + .context("Failed to copy bytes from request to command stdin")?; - tokio::io::copy_buf(&mut body, &mut stdin) + // wait for the headers back from git http-backend + let mut out_buf = BytesMut::with_capacity(1024); + let headers = loop { + let n = stdout + .read_buf(&mut out_buf) .await - .context("Failed to copy bytes from request to command stdin")?; + .context("Failed to read headers")?; + if n == 0 { + break None; + } + + if let Some((body_offset, headers)) = parse_cgi_headers(&out_buf)? { + out_buf.advance(body_offset); + break Some(headers); + } + }; + + // if the `headers` loop broke with `None`, the `git http-backend` didn't return any parseable + // headers so there's no reason for us to continue. there may be something in stderr for us + // though. + let Some(headers) = headers else { + print_status(&mut child, &mut stderr).await; + return Err(anyhow!("Received incomplete response from git http-backend").into()); + }; + + // stream the response back to the client + let (body_send, body_recv) = mpsc::channel(8); + tokio::spawn( + forward_response_to_client(out_buf, body_send, stdout, stderr, child) + .instrument(info_span!("git http-backend reader")), + ); + + Ok((headers, StreamBody::new(ReceiverStream::new(body_recv)))) +} + +/// Forwards the entirety of `stdout` to `body_send`, printing subprocess stderr and status on +/// completion. +async fn forward_response_to_client( + mut out_buf: BytesMut, + body_send: mpsc::Sender>, + mut stdout: ChildStdout, + mut stderr: ChildStderr, + mut child: Child, +) { + loop { + let (out, mut end) = match stdout.read_buf(&mut out_buf).await { + Ok(0) => (Ok(out_buf.split().freeze()), true), + Ok(n) => (Ok(out_buf.split_to(n).freeze()), false), + Err(e) => (Err(e), true), + }; + + if body_send.send(out).await.is_err() { + warn!("Receiver went away during git http-backend call"); + end = true; + } + + if end { + break; + } } - let out = child - .wait_with_output() - .await - .context("Failed to read git http-backend response")?; - let resp = cgi_to_response(&out.stdout)?; + print_status(&mut child, &mut stderr).await; +} - if !out.stderr.is_empty() { - warn!( - "Git returned an error: `{}`", - String::from_utf8_lossy(&out.stderr) - ); +/// Prints the exit status of the `git` subprocess. +async fn print_status(child: &mut Child, stderr: &mut ChildStderr) { + match tokio::try_join!(child.wait(), read_stderr(stderr)) { + Ok((status, stderr)) if status.success() => { + debug!(stderr, "git http-backend successfully shutdown") + } + Ok((status, stderr)) => error!(stderr, "git http-backend exited with status code {status}"), + Err(e) => error!("Failed to wait on git http-backend shutdown: {e}"), } +} - Ok(resp) +/// Reads the entirety of stderr for the given handle. +async fn read_stderr(stderr: &mut ChildStderr) -> io::Result { + let mut stderr_out = Vec::new(); + stderr.read_to_end(&mut stderr_out).await?; + Ok(String::from_utf8_lossy(&stderr_out).into_owned()) } +/// Extracts a single header (`header`) from the `input` and passes it as `env` to +/// `output`. fn extract_header(input: &HeaderMap, output: &mut Command, header: &str, env: &str) -> Result<()> { if let Some(value) = input.get(header) { output.env(env, value.to_str().context("Invalid header")?); @@ -88,6 +163,7 @@ fn extract_header(input: &HeaderMap, output: &mut Command, header: &str, env: &s Ok(()) } +/// Extract the path from the URL to determine the repository path. fn extract_path<'a>(uri: &'a Uri, repository: &Path) -> Result<&'a str> { let path = uri.path(); let path = path.strip_prefix('/').unwrap_or(path); @@ -99,17 +175,17 @@ fn extract_path<'a>(uri: &'a Uri, repository: &Path) -> Result<&'a str> { } } -// https://en.wikipedia.org/wiki/Common_Gateway_Interface -pub fn cgi_to_response(buffer: &[u8]) -> Result { +// Intercept headers from the spawned `git http-backend` CGI and rewrite them to +// an `axum::Response`. +pub fn parse_cgi_headers(buffer: &[u8]) -> Result)>, anyhow::Error> { let mut headers = [httparse::EMPTY_HEADER; 10]; let (body_offset, headers) = match httparse::parse_headers(buffer, &mut headers)? { Status::Complete(v) => v, - Status::Partial => bail!("Git returned a partial response over CGI"), + Status::Partial => return Ok(None), }; - let mut response = Response::new(boxed(Body::from(buffer[body_offset..].to_vec()))); + let mut response = Response::new(()); - // TODO: extract status header for header in headers { response.headers_mut().insert( HeaderName::from_str(header.name) @@ -131,5 +207,5 @@ pub fn cgi_to_response(buffer: &[u8]) -> Result { } } - Ok(response) + Ok(Some((body_offset, response))) }