Skip to content

Commit

Permalink
Rewrite the webhook receiver in Rust
Browse files Browse the repository at this point in the history
  • Loading branch information
dasJ committed Dec 23, 2024
1 parent 059463b commit 671fa93
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 3 deletions.
24 changes: 22 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions config.public.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
{
"github_webhook_receiver": {
"listen": "[::1]:9899",
"webhook_secret_file: "/run/secrets/ofborg/github-webhook-secret",
"rabbitmq": {
"host": "localhost",
"ssl": false,
"username": "ofborg-github-webhook",
"password_file": "/run/secrets/ofborg/github-webhook-amqp-password",
"virtualhost": "ofborg"
}
},
"feedback": {
"full_logs": true
},
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
test -e $out/bin/builder
test -e $out/bin/github_comment_filter
test -e $out/bin/github_comment_poster
test -e $out/bin/github_webhook_receiver
test -e $out/bin/log_message_collector
test -e $out/bin/evaluation_filter
'';
Expand Down
3 changes: 3 additions & 0 deletions ofborg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
uuid = { version = "1.2", features = ["v4"] }
rustls-pemfile = "1.0.2"
hmac = "0.12.1"
sha2 = "0.10.8"
hex = "0.4.3"
207 changes: 207 additions & 0 deletions ofborg/src/bin/github-webhook-receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use std::env;
use std::error::Error;
use std::io::Read as _;
use std::sync::Arc;
#[macro_use]
extern crate hyper;

use async_std::task;
use hmac::{Hmac, Mac};
use hyper::header;
use hyper::{
server::{Request, Response, Server},
status::StatusCode,
};
use lapin::options::BasicPublishOptions;
use lapin::{BasicProperties, Channel};
use ofborg::ghevent::GenericWebhook;
use ofborg::{config, easyamqp, easyamqp::ChannelExt, easylapin};
use sha2::Sha256;
use tracing::{error, info, warn};

header! { (XHubSignature256, "X-Hub-Signature-256") => [String] }
header! { (XGithubEvent, "X-Github-Event") => [String] }

/// Prepares the the exchange we will write to, the queues that are bound to it
/// and binds them.
fn setup_amqp(chan: &mut Channel) -> Result<(), Box<dyn Error>> {
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "github-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;

let queue_name = String::from("build-inputs");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "github-events".to_owned(),
routing_key: Some(String::from("issue_comment.*")),
no_wait: false,
})?;

let queue_name = String::from("github-events-unknown");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "github-events".to_owned(),
routing_key: Some(String::from("unknown.*")),
no_wait: false,
})?;

let queue_name = String::from("mass-rebuild-check-inputs");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "github-events".to_owned(),
routing_key: Some(String::from("pull_request.nixos/nixpkgs")),
no_wait: false,
})?;
Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args()
.nth(1)
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let Some(cfg) = config::load(arg.as_ref()).github_webhook_receiver else {
error!("No GitHub Webhook configuration found!");
panic!();
};

let webhook_secret = std::fs::read_to_string(cfg.webhook_secret_file)
.expect("Unable to read webhook secret file");
let webhook_secret = Arc::new(webhook_secret.trim().to_string());

let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
setup_amqp(&mut chan)?;

//let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?);
let threads = std::thread::available_parallelism()
.map(|x| x.get())
.unwrap_or(1);
info!("Will listen on {} with {threads} threads", cfg.listen);
Server::http(cfg.listen)?.handle_threads(
move |mut req: Request, mut res: Response| {
// HTTP 405
if req.method != hyper::Post {
*res.status_mut() = StatusCode::MethodNotAllowed;
return;
}
let hdr = req.headers.clone();

// Read body
let mut raw = Vec::new();
if req.read_to_end(&mut raw).is_err() {
warn!("Failed to read body from client");
*res.status_mut() = StatusCode::InternalServerError;
return;
}
let raw = raw.as_slice();

// Validate signature
{
let Some(sig) = hdr.get::<XHubSignature256>() else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Missing signature header");
return;
};
let mut components = sig.splitn(2, '=');
let Some(algo) = components.next() else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Signature hash method missing");
return;
};
let Some(hash) = components.next() else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Signature hash missing");
return;
};
let Ok(hash) = hex::decode(hash) else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Invalid signature hash hex");
return;
};

if algo != "sha256" {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Invalid signature hash method");
return;
}

let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(webhook_secret.as_bytes()) else {
*res.status_mut() = StatusCode::InternalServerError;
error!("Unable to create HMAC from secret");
return;
};
mac.update(raw);
if mac.verify_slice(hash.as_slice()).is_err() {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Signature verification failed");
return;
}
}

// Parse body
let Ok(input) = serde_json::from_slice::<GenericWebhook>(raw) else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Invalid JSON");
error!("Invalid JSON received");
return;
};

// Build routing key
let Some(event_type) = hdr.get::<XGithubEvent>() else {
*res.status_mut() = StatusCode::BadRequest;
let _ = res.send(b"Missing event type");
return;
};
let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase());

// Publish message
let _confirmation = task::block_on(async {
chan.basic_publish(
"github-events",
&routing_key,
BasicPublishOptions::default(),
raw,
BasicProperties::default()
.with_content_type("application/json".into())
.with_delivery_mode(2), // persistent
)
.await
});
*res.status_mut() = StatusCode::NoContent;
},
threads,
)?;
Ok(())
}
14 changes: 14 additions & 0 deletions ofborg/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use tracing::{debug, error, info, warn};

#[derive(Serialize, Deserialize, Debug)]
pub struct Config {
/// Configuration for the webhook receiver
pub github_webhook_receiver: Option<GithubWebhookConfig>,
pub runner: RunnerConfig,
pub feedback: FeedbackConfig,
pub checkout: CheckoutConfig,
Expand All @@ -24,6 +26,18 @@ pub struct Config {
pub log_storage: Option<LogStorage>,
}

/// Configuration for the webhook receiver
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GithubWebhookConfig {
/// Listen host/port
pub listen: String,
/// Path to the GitHub webhook secret
pub webhook_secret_file: String,
/// RabbitMQ broker to connect to
pub rabbitmq: RabbitMqConfig,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct FeedbackConfig {
pub full_logs: bool,
Expand Down
8 changes: 8 additions & 0 deletions ofborg/src/ghevent/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@ pub struct Repository {
pub struct Issue {
pub number: u64,
}

/// A generic webhook that we received with minimal verification, only for handling in the GitHub
/// webhook receiver.
#[derive(Serialize, Deserialize, Debug)]
pub struct GenericWebhook {
/// The repository the event originated
pub repository: Repository,
}
2 changes: 1 addition & 1 deletion ofborg/src/ghevent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod common;
mod issuecomment;
mod pullrequestevent;

pub use self::common::{Comment, Issue, Repository, User};
pub use self::common::{Comment, GenericWebhook, Issue, Repository, User};
pub use self::issuecomment::{IssueComment, IssueCommentAction};
pub use self::pullrequestevent::{
PullRequest, PullRequestAction, PullRequestEvent, PullRequestState,
Expand Down

0 comments on commit 671fa93

Please sign in to comment.