Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus instrumentation for coprocessor #35

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
399 changes: 394 additions & 5 deletions fhevm-engine/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ sha3 = "0.10.8"
anyhow = "1.0.86"
daggy = "0.8.0"
serde = "1.0.210"
prometheus = "0.13.4"

[profile.dev.package.tfhe]
overflow-checks = false
Expand Down
2 changes: 2 additions & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ fhevm-engine-common = { path = "../fhevm-engine-common" }
strum = { version = "0.26", features = ["derive"] }
bincode.workspace = true
sha3.workspace = true
prometheus.workspace = true
actix-web = "4.9.0"

[dev-dependencies]
testcontainers = "0.21"
Expand Down
10 changes: 6 additions & 4 deletions fhevm-engine/coprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

DB_URL ?= DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor

.PHONY: build
build:
cargo build
Expand All @@ -11,9 +13,9 @@ cleanup:
init_db:
docker compose up -d
sleep 3
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor sqlx db create
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor sqlx migrate run
DATABASE_URL=postgres://postgres:[email protected]:5432/coprocessor cargo test setup_test_user -- --nocapture --ignored
$(DB_URL) sqlx db create
$(DB_URL) sqlx migrate run
$(DB_URL) cargo test setup_test_user -- --nocapture --ignored

.PHONY: recreate_db
recreate_db:
Expand All @@ -23,4 +25,4 @@ recreate_db:
.PHONY: clean_run
clean_run:
$(MAKE) recreate_db
RUST_BACKTRACE=1 cargo run -- --run-server --run-bg-worker
RUST_BACKTRACE=1 $(DB_URL) cargo run --release -- --run-server --run-bg-worker
6 changes: 5 additions & 1 deletion fhevm-engine/coprocessor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Args {
pub tenant_key_cache_size: i32,

/// Maximum compact inputs to upload
#[arg(long, default_value_t = 8)]
#[arg(long, default_value_t = 10)]
pub maximimum_compact_inputs_upload: usize,

/// Maximum compact inputs to upload
Expand All @@ -55,6 +55,10 @@ pub struct Args {
#[arg(long, default_value = "127.0.0.1:50051")]
pub server_addr: String,

/// Prometheus metrics server address
#[arg(long, default_value = "0.0.0.0:9100")]
pub metrics_addr: String,

/// Postgres database url. If unspecified DATABASE_URL environment variable is used
#[arg(long)]
pub database_url: Option<String>,
Expand Down
6 changes: 6 additions & 0 deletions fhevm-engine/coprocessor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod tests;
mod tfhe_worker;
mod types;
mod utils;
mod metrics;

fn main() {
let args = crate::cli::parse_args();
Expand Down Expand Up @@ -70,6 +71,11 @@ async fn async_main(
set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone()));
}

if !args.metrics_addr.is_empty() {
println!("Initializing metrics server");
set.spawn(crate::metrics::run_metrics_server(args.clone()));
}

if set.is_empty() {
panic!("No tasks specified to run");
}
Expand Down
27 changes: 27 additions & 0 deletions fhevm-engine/coprocessor/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
async fn metrics() -> impl actix_web::Responder {
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode_to_string(&metric_families).expect("can't encode metrics")
}

async fn healthcheck() -> impl actix_web::Responder {
"OK"
}

pub async fn run_metrics_server(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("metrics server listening at {}", args.metrics_addr);
let _ = actix_web::HttpServer::new(|| {
actix_web::App::new()
.route("/metrics", actix_web::web::to(metrics))
.route("/health", actix_web::web::to(healthcheck))
})
.bind(&args.metrics_addr)
.expect("can't bind to metrics server address")
.workers(1)
.run()
.await?;

Ok(())
}
88 changes: 77 additions & 11 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use fhevm_engine_common::tfhe_ops::{
try_expand_ciphertext_list, validate_fhe_type,
};
use fhevm_engine_common::types::{FhevmError, SupportedFheCiphertexts, SupportedFheOperations};
use prometheus::{register_int_counter, IntCounter};
use sha3::{Digest, Keccak256};
use sqlx::{query, Acquire};
use tokio::task::spawn_blocking;
use tonic::transport::Server;
use lazy_static::lazy_static;

pub mod common {
tonic::include_proto!("fhevm.common");
Expand All @@ -34,6 +36,25 @@ pub mod coprocessor {
tonic::include_proto!("fhevm.coprocessor");
}

lazy_static! {
static ref UPLOAD_INPUTS_COUNTER: IntCounter =
register_int_counter!("coprocessor_upload_inputs_count", "grpc calls for inputs upload endpoint").unwrap();
static ref UPLOAD_INPUTS_ERRORS: IntCounter =
register_int_counter!("coprocessor_upload_inputs_errors", "grpc errors while calling upload inputs").unwrap();
static ref ASYNC_COMPUTE_COUNTER: IntCounter =
register_int_counter!("coprocessor_async_compute_count", "grpc calls for async compute endpoint").unwrap();
static ref ASYNC_COMPUTE_ERRORS: IntCounter =
register_int_counter!("coprocessor_async_compute_errors", "grpc errors while calling async compute").unwrap();
static ref TRIVIAL_ENCRYPT_COUNTER: IntCounter =
register_int_counter!("coprocessor_trivial_encrypt_count", "grpc calls for trivial encrypt endpoint").unwrap();
static ref TRIVIAL_ENCRYPT_ERRORS: IntCounter =
register_int_counter!("coprocessor_trivial_encrypt_errors", "grpc errors while calling trivial encrypt").unwrap();
static ref GET_CIPHERTEXTS_COUNTER: IntCounter =
register_int_counter!("coprocessor_get_ciphertexts_count", "grpc calls for get ciphertexts endpoint").unwrap();
static ref GET_CIPHERTEXTS_ERRORS: IntCounter =
register_int_counter!("coprocessor_get_ciphertexts_errors", "grpc errors while calling get ciphertexts").unwrap();
}

pub struct CoprocessorService {
pool: sqlx::Pool<sqlx::Postgres>,
args: crate::cli::Args,
Expand Down Expand Up @@ -140,6 +161,58 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
&self,
request: tonic::Request<InputUploadBatch>,
) -> std::result::Result<tonic::Response<InputUploadResponse>, tonic::Status> {
UPLOAD_INPUTS_COUNTER.inc();
self.upload_inputs_impl(request).await.inspect_err(|_| {
UPLOAD_INPUTS_ERRORS.inc();
})
}

async fn async_compute(
&self,
request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
ASYNC_COMPUTE_COUNTER.inc();
self.async_compute_impl(request).await.inspect_err(|_| {
ASYNC_COMPUTE_ERRORS.inc();
})
}

async fn wait_computations(
&self,
_request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::FhevmResponses>, tonic::Status> {
return Err(tonic::Status::unimplemented("not implemented"));
}

async fn trivial_encrypt_ciphertexts(
&self,
request: tonic::Request<coprocessor::TrivialEncryptBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
TRIVIAL_ENCRYPT_COUNTER.inc();
self.trivial_encrypt_ciphertexts_impl(request).await.inspect_err(|_| {
TRIVIAL_ENCRYPT_ERRORS.inc()
})
}

async fn get_ciphertexts(
&self,
request: tonic::Request<coprocessor::GetCiphertextBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GetCiphertextResponse>, tonic::Status>
{
GET_CIPHERTEXTS_COUNTER.inc();
self.get_ciphertexts_impl(request).await.inspect_err(|_| {
GET_CIPHERTEXTS_ERRORS.inc();
})
}
}

impl CoprocessorService {
async fn upload_inputs_impl(
&self,
request: tonic::Request<InputUploadBatch>,
) -> std::result::Result<tonic::Response<InputUploadResponse>, tonic::Status> {
UPLOAD_INPUTS_COUNTER.inc();

let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let req = request.get_ref();
Expand Down Expand Up @@ -400,7 +473,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
Ok(tonic::Response::new(response))
}

async fn async_compute(
async fn async_compute_impl(
&self,
request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
Expand Down Expand Up @@ -534,14 +607,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn wait_computations(
&self,
_request: tonic::Request<coprocessor::AsyncComputeRequest>,
) -> std::result::Result<tonic::Response<coprocessor::FhevmResponses>, tonic::Status> {
return Err(tonic::Status::unimplemented("not implemented"));
}

async fn trivial_encrypt_ciphertexts(
async fn trivial_encrypt_ciphertexts_impl(
&self,
request: tonic::Request<coprocessor::TrivialEncryptBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
Expand Down Expand Up @@ -616,7 +682,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn get_ciphertexts(
async fn get_ciphertexts_impl(
&self,
request: tonic::Request<coprocessor::GetCiphertextBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GetCiphertextResponse>, tonic::Status>
Expand Down Expand Up @@ -676,4 +742,4 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ

return Ok(tonic::Response::new(result));
}
}
}
1 change: 1 addition & 0 deletions fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
tokio_threads: 2,
pg_pool_max_connections: 2,
server_addr: format!("127.0.0.1:{app_port}"),
metrics_addr: "".to_string(),
database_url: Some(db_url.to_string()),
maximimum_compact_inputs_upload: 10,
coprocessor_private_key: "./coprocessor.key".to_string(),
Expand Down
23 changes: 23 additions & 0 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ use fhevm_engine_common::{
tfhe_ops::{current_ciphertext_version, perform_fhe_operation},
types::SupportedFheOperations,
};
use prometheus::{register_int_counter, IntCounter};
use sqlx::{postgres::PgListener, query, Acquire};
use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
};
use lazy_static::lazy_static;

lazy_static! {
static ref WORKER_ERRORS_COUNTER: IntCounter =
register_int_counter!("coprocessor_worker_errors", "worker errors encountered").unwrap();
static ref WORK_ITEMS_POLL_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_polls", "times work items are polled from database").unwrap();
static ref WORK_ITEMS_NOTIFICATIONS_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_notifications", "times instant notifications for work items received from the database").unwrap();
static ref WORK_ITEMS_FOUND_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_found", "work items queried from database").unwrap();
static ref WORK_ITEMS_ERRORS_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_errors", "work items errored out during computation").unwrap();
static ref WORK_ITEMS_PROCESSED_COUNTER: IntCounter =
register_int_counter!("coprocessor_work_items_processed", "work items successfully processed and stored in the database").unwrap();
}

pub async fn run_tfhe_worker(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
// here we log the errors and make sure we retry
if let Err(cycle_error) = tfhe_worker_cycle(&args).await {
WORKER_ERRORS_COUNTER.inc();
eprintln!(
"Error in background worker, retrying shortly: {:?}",
cycle_error
Expand Down Expand Up @@ -49,9 +67,11 @@ async fn tfhe_worker_cycle(
if !immedially_poll_more_work {
tokio::select! {
_ = listener.try_recv() => {
WORK_ITEMS_NOTIFICATIONS_COUNTER.inc();
println!("Received work_available notification from postgres");
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => {
WORK_ITEMS_POLL_COUNTER.inc();
println!("Polling the database for more work on timer");
},
};
Expand Down Expand Up @@ -93,6 +113,7 @@ async fn tfhe_worker_cycle(
continue;
}

WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
println!("Processing {} work items", the_work.len());

// make sure we process each tenant sequentially not to
Expand Down Expand Up @@ -248,8 +269,10 @@ async fn tfhe_worker_cycle(
)
.execute(trx.as_mut())
.await?;
WORK_ITEMS_PROCESSED_COUNTER.inc();
}
Err((err, tenant_id, output_handle)) => {
WORKER_ERRORS_COUNTER.inc();
let _ = query!(
"
UPDATE computations
Expand Down
Loading