Skip to content

Commit

Permalink
feat: add inputs support for coprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
david-zk authored Aug 25, 2024
1 parent 99b2d6b commit 71984ed
Show file tree
Hide file tree
Showing 15 changed files with 718 additions and 88 deletions.
184 changes: 184 additions & 0 deletions fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ resolver = "2"
members = ["coprocessor", "executor", "fhevm-engine-common"]

[workspace.dependencies]
tfhe = { version = "0.8.0-alpha.2", features = ["boolean", "shortint", "integer", "aarch64-unix"] }
tfhe = { version = "0.8.0-alpha.2", features = ["boolean", "shortint", "integer", "aarch64-unix", "zk-pok"] }
clap = { version = "4.5", features = ["derive"] }
1 change: 1 addition & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bigdecimal = "0.4"
fhevm-engine-common = { path = "../fhevm-engine-common" }
strum = { version = "0.26", features = ["derive"] }
bincode = "1.3.3"
sha3 = "0.10.8"

[dev-dependencies]
testcontainers = "0.21"
Expand Down
14 changes: 14 additions & 0 deletions fhevm-engine/coprocessor/migrations/20240722111257_coprocessor.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
CREATE TABLE IF NOT EXISTS computations (
tenant_id INT NOT NULL,
output_handle BYTEA NOT NULL,
output_type SMALLINT NOT NULL,
-- can be handle or scalar, depends on is_scalar field
-- only second dependency can ever be scalar
dependencies BYTEA[] NOT NULL,
Expand All @@ -21,10 +22,23 @@ CREATE TABLE IF NOT EXISTS ciphertexts (
ciphertext BYTEA NOT NULL,
ciphertext_version SMALLINT NOT NULL,
ciphertext_type SMALLINT NOT NULL,
-- if ciphertext came from blob we have its reference
input_blob_hash BYTEA,
input_blob_index INT NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (tenant_id, handle, ciphertext_version)
);

-- store for audits and historical reference
CREATE TABLE IF NOT EXISTS input_blobs (
tenant_id INT NOT NULL,
blob_hash BYTEA NOT NULL,
blob_data BYTEA NOT NULL,
blob_ciphertext_count INT NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (tenant_id, blob_hash)
);

CREATE TABLE IF NOT EXISTS tenants (
tenant_id SERIAL PRIMARY KEY,
tenant_api_key UUID NOT NULL DEFAULT gen_random_uuid(),
Expand Down
8 changes: 8 additions & 0 deletions fhevm-engine/coprocessor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ pub struct Args {
#[arg(long, default_value_t = 32)]
pub tenant_key_cache_size: i32,

/// Maximum compact inputs to upload
#[arg(long, default_value_t = 8)]
pub maximimum_compact_inputs_upload: usize,

/// Maximum compact inputs to upload
#[arg(long, default_value_t = 255)]
pub maximum_handles_per_input: u8,

/// Coprocessor FHE processing threads
#[arg(long, default_value_t = 8)]
pub coprocessor_fhe_threads: usize,
Expand Down
85 changes: 79 additions & 6 deletions fhevm-engine/coprocessor/src/db_queries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeSet, HashMap};
use std::str::FromStr;

use crate::types::CoprocessorError;
use crate::types::{CoprocessorError, TfheTenantKeys};
use sqlx::{query, Postgres};

/// Returns tenant id upon valid authorization request
Expand Down Expand Up @@ -55,12 +55,19 @@ pub async fn check_if_ciphertexts_exist_in_db(
) -> Result<HashMap<Vec<u8>, i16>, CoprocessorError> {
let handles_to_check_in_db_vec = cts.iter().cloned().collect::<Vec<_>>();
let ciphertexts = query!(
"
SELECT handle, ciphertext_type
r#"
-- existing computations
SELECT handle AS "handle!", ciphertext_type AS "ciphertext_type!"
FROM ciphertexts
WHERE handle = ANY($1::BYTEA[])
AND tenant_id = $2
",
WHERE tenant_id = $2
AND handle = ANY($1::BYTEA[])
UNION
-- pending computations
SELECT output_handle AS "handle!", output_type AS "ciphertext_type!"
FROM computations
WHERE tenant_id = $2
AND output_handle = ANY($1::BYTEA[])
"#,
&handles_to_check_in_db_vec,
tenant_id,
)
Expand All @@ -86,3 +93,69 @@ pub async fn check_if_ciphertexts_exist_in_db(

Ok(result)
}

pub async fn fetch_tenant_server_key<'a, T>(tenant_id: i32, pool: T, tenant_key_cache: &std::sync::Arc<tokio::sync::RwLock<lru::LruCache<i32, TfheTenantKeys>>>)
-> Result<tfhe::ServerKey, Box<dyn std::error::Error + Send + Sync>>
where T: sqlx::PgExecutor<'a> + Copy
{
// try getting from cache until it succeeds with populating cache
loop {
{
let mut w = tenant_key_cache.write().await;
if let Some(key) = w.get(&tenant_id) {
return Ok(key.sks.clone());
}
}

populate_cache_with_tenant_keys(vec![tenant_id], pool, &tenant_key_cache).await?;
}
}

pub async fn query_tenant_keys<'a, T>(tenants_to_query: Vec<i32>, conn: T)
-> Result<Vec<TfheTenantKeys>, Box<dyn std::error::Error + Send + Sync>>
where T: sqlx::PgExecutor<'a>
{
let mut res = Vec::with_capacity(tenants_to_query.len());
let keys = query!(
"
SELECT tenant_id, pks_key, sks_key
FROM tenants
WHERE tenant_id = ANY($1::INT[])
",
&tenants_to_query
)
.fetch_all(conn)
.await?;

for key in keys {
let sks: tfhe::ServerKey = bincode::deserialize(&key.sks_key)
.expect("We can't deserialize our own validated sks key");
let pks: tfhe::CompactPublicKey = bincode::deserialize(&key.pks_key)
.expect("We can't deserialize our own validated pks key");
res.push(TfheTenantKeys { tenant_id: key.tenant_id, sks, pks });
}

Ok(res)
}

pub async fn populate_cache_with_tenant_keys<'a, T>(tenants_to_query: Vec<i32>, conn: T, tenant_key_cache: &std::sync::Arc<tokio::sync::RwLock<lru::LruCache<i32, TfheTenantKeys>>>)
-> Result<(), Box<dyn std::error::Error + Send + Sync>>
where T: sqlx::PgExecutor<'a>
{
if !tenants_to_query.is_empty() {
let keys = query_tenant_keys(tenants_to_query, conn).await?;

assert!(
keys.len() > 0,
"We should have keys here, otherwise our database is corrupt"
);

let mut key_cache = tenant_key_cache.write().await;

for key in keys {
key_cache.put(key.tenant_id, key);
}
}

Ok(())
}
Loading

0 comments on commit 71984ed

Please sign in to comment.