Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coprocessor edge case testing #12

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Git LFS file not shown
6 changes: 3 additions & 3 deletions fhevm-engine/coprocessor/migrations/gen-keys.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ echo "
INSERT INTO tenants(tenant_api_key, pks_key, sks_key, cks_key)
VALUES (
'a1503fb6-d79b-4e9e-826d-44cf262f3e05',
decode('$(cat ../fhevm-keys/pks | xxd -p | tr -d '\n')','hex'),
decode('$(cat ../fhevm-keys/sks | xxd -p | tr -d '\n')','hex'),
decode('$(cat ../fhevm-keys/cks | xxd -p | tr -d '\n')','hex')
decode('$(cat ../../fhevm-keys/pks | xxd -p | tr -d '\n')','hex'),
decode('$(cat ../../fhevm-keys/sks | xxd -p | tr -d '\n')','hex'),
decode('$(cat ../../fhevm-keys/cks | xxd -p | tr -d '\n')','hex')
)
" > 20240723111257_coprocessor.sql
344 changes: 173 additions & 171 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,143 +63,12 @@ pub async fn run_server(

#[tonic::async_trait]
impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorService {
async fn debug_encrypt_ciphertext(
&self,
request: tonic::Request<coprocessor::DebugEncryptRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;
let req = request.get_ref();

let mut public_key = sqlx::query!(
"
SELECT sks_key
FROM tenants
WHERE tenant_id = $1
",
tenant_id
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

assert_eq!(public_key.len(), 1);

let public_key = public_key.pop().unwrap();

let cloned = req.values.clone();
let out_cts = tokio::task::spawn_blocking(move || {
let server_key: tfhe::ServerKey = bincode::deserialize(&public_key.sks_key).unwrap();
tfhe::set_server_key(server_key);

// single threaded implementation as this is debug function and it is simple to implement
let mut res: Vec<(Vec<u8>, i16, Vec<u8>)> = Vec::with_capacity(cloned.len());
for v in cloned {
let ct = debug_trivial_encrypt_be_bytes(v.output_type as i16, &v.le_value);
let (ct_type, ct_bytes) = ct.serialize();
res.push((v.handle, ct_type, ct_bytes));
}

res
})
.await
.unwrap();

let mut conn = self
.pool
.acquire()
.await
.map_err(Into::<CoprocessorError>::into)?;
let mut trx = conn.begin().await.map_err(Into::<CoprocessorError>::into)?;

for (handle, db_type, db_bytes) in out_cts {
sqlx::query!("
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
VALUES ($1, $2, $3, $4, $5)
",
tenant_id, handle, db_bytes, current_ciphertext_version(), db_type as i16
)
.execute(trx.as_mut()).await.map_err(Into::<CoprocessorError>::into)?;
}

trx.commit().await.map_err(Into::<CoprocessorError>::into)?;

return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn debug_decrypt_ciphertext(
&self,
request: tonic::Request<coprocessor::DebugDecryptRequest>,
) -> std::result::Result<tonic::Response<coprocessor::DebugDecryptResponse>, tonic::Status>
{
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;
let req = request.get_ref();

let mut priv_key = sqlx::query!(
"
SELECT cks_key
FROM tenants
WHERE tenant_id = $1
",
tenant_id
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

if priv_key.is_empty() || priv_key[0].cks_key.is_none() {
return Err(tonic::Status::not_found("tenant private key not found"));
}

assert_eq!(priv_key.len(), 1);

let cts = sqlx::query!(
"
SELECT ciphertext, ciphertext_type, handle
FROM ciphertexts
WHERE tenant_id = $1
AND handle = ANY($2::BYTEA[])
AND ciphertext_version = $3
",
tenant_id,
&req.handles,
current_ciphertext_version()
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

if cts.is_empty() {
return Err(tonic::Status::not_found("ciphertext not found"));
}

let priv_key = priv_key.pop().unwrap().cks_key.unwrap();

let values = tokio::task::spawn_blocking(move || {
let client_key: tfhe::ClientKey = bincode::deserialize(&priv_key).unwrap();

let mut decrypted: Vec<DebugDecryptResponseSingle> = Vec::with_capacity(cts.len());
for ct in cts {
let deserialized =
deserialize_fhe_ciphertext(ct.ciphertext_type, &ct.ciphertext)
.unwrap();
decrypted.push(DebugDecryptResponseSingle {
output_type: ct.ciphertext_type as i32,
value: deserialized.decrypt(&client_key),
});
}

decrypted
})
.await
.unwrap();

return Ok(tonic::Response::new(DebugDecryptResponse { values }));
}

async fn upload_inputs(
&self,
request: tonic::Request<InputUploadBatch>,
) -> std::result::Result<tonic::Response<InputUploadResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let req = request.get_ref();
if req.input_ciphertexts.len() > self.args.maximimum_compact_inputs_upload {
return Err(tonic::Status::from_error(Box::new(
Expand All @@ -217,8 +86,6 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
return Ok(tonic::Response::new(response));
}

let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let server_key = {
fetch_tenant_server_key(tenant_id, &self.pool, &self.tenant_key_cache)
.await
Expand Down Expand Up @@ -335,42 +202,6 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
Ok(tonic::Response::new(response))
}

async fn upload_ciphertexts(
&self,
request: tonic::Request<coprocessor::CiphertextUploadBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let req = request.get_ref();

// TODO: check if ciphertext deserializes into type correctly
// TODO: check for duplicate handles in the input
// TODO: check if ciphertext doesn't exist already
// TODO: if ciphertexts exists check that it is equal to the one being uploaded

let mut trx = self
.pool
.begin()
.await
.map_err(Into::<CoprocessorError>::into)?;
for i_ct in &req.input_ciphertexts {
let ciphertext_type: i16 = i_ct
.ciphertext_type
.try_into()
.map_err(|_e| CoprocessorError::FhevmError(FhevmError::UnknownFheType(i_ct.ciphertext_type)))?;
let _ = sqlx::query!("
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
VALUES($1, $2, $3, $4, $5)
ON CONFLICT (tenant_id, handle, ciphertext_version) DO NOTHING
", tenant_id, i_ct.ciphertext_handle, i_ct.ciphertext_bytes, current_ciphertext_version(), ciphertext_type)
.execute(trx.as_mut()).await.map_err(Into::<CoprocessorError>::into)?;
}

trx.commit().await.map_err(Into::<CoprocessorError>::into)?;

return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn async_compute(
&self,
request: tonic::Request<coprocessor::AsyncComputeRequest>,
Expand Down Expand Up @@ -504,4 +335,175 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ
) -> std::result::Result<tonic::Response<coprocessor::FhevmResponses>, tonic::Status> {
return Err(tonic::Status::unimplemented("not implemented"));
}

// debug functions below, should be removed in production
async fn debug_encrypt_ciphertext(
&self,
request: tonic::Request<coprocessor::DebugEncryptRequest>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;
let req = request.get_ref();

let mut public_key = sqlx::query!(
"
SELECT sks_key
FROM tenants
WHERE tenant_id = $1
",
tenant_id
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

assert_eq!(public_key.len(), 1);

let public_key = public_key.pop().unwrap();

let cloned = req.values.clone();
let out_cts = tokio::task::spawn_blocking(move || {
let server_key: tfhe::ServerKey = bincode::deserialize(&public_key.sks_key).unwrap();
tfhe::set_server_key(server_key);

// single threaded implementation as this is debug function and it is simple to implement
let mut res: Vec<(Vec<u8>, i16, Vec<u8>)> = Vec::with_capacity(cloned.len());
for v in cloned {
let ct = debug_trivial_encrypt_be_bytes(v.output_type as i16, &v.le_value);
let (ct_type, ct_bytes) = ct.serialize();
res.push((v.handle, ct_type, ct_bytes));
}

res
})
.await
.unwrap();

let mut conn = self
.pool
.acquire()
.await
.map_err(Into::<CoprocessorError>::into)?;
let mut trx = conn.begin().await.map_err(Into::<CoprocessorError>::into)?;

for (handle, db_type, db_bytes) in out_cts {
sqlx::query!("
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
VALUES ($1, $2, $3, $4, $5)
",
tenant_id, handle, db_bytes, current_ciphertext_version(), db_type as i16
)
.execute(trx.as_mut()).await.map_err(Into::<CoprocessorError>::into)?;
}

trx.commit().await.map_err(Into::<CoprocessorError>::into)?;

return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn debug_decrypt_ciphertext(
&self,
request: tonic::Request<coprocessor::DebugDecryptRequest>,
) -> std::result::Result<tonic::Response<coprocessor::DebugDecryptResponse>, tonic::Status>
{
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;
let req = request.get_ref();

let mut priv_key = sqlx::query!(
"
SELECT cks_key
FROM tenants
WHERE tenant_id = $1
",
tenant_id
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

if priv_key.is_empty() || priv_key[0].cks_key.is_none() {
return Err(tonic::Status::not_found("tenant private key not found"));
}

assert_eq!(priv_key.len(), 1);

let cts = sqlx::query!(
"
SELECT ciphertext, ciphertext_type, handle
FROM ciphertexts
WHERE tenant_id = $1
AND handle = ANY($2::BYTEA[])
AND ciphertext_version = $3
",
tenant_id,
&req.handles,
current_ciphertext_version()
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

if cts.is_empty() {
return Err(tonic::Status::not_found("ciphertext not found"));
}

let priv_key = priv_key.pop().unwrap().cks_key.unwrap();

let values = tokio::task::spawn_blocking(move || {
let client_key: tfhe::ClientKey = bincode::deserialize(&priv_key).unwrap();

let mut decrypted: Vec<DebugDecryptResponseSingle> = Vec::with_capacity(cts.len());
for ct in cts {
let deserialized =
deserialize_fhe_ciphertext(ct.ciphertext_type, &ct.ciphertext)
.unwrap();
decrypted.push(DebugDecryptResponseSingle {
output_type: ct.ciphertext_type as i32,
value: deserialized.decrypt(&client_key),
});
}

decrypted
})
.await
.unwrap();

return Ok(tonic::Response::new(DebugDecryptResponse { values }));
}

async fn upload_ciphertexts(
&self,
request: tonic::Request<coprocessor::CiphertextUploadBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GenericResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;

let req = request.get_ref();

// TODO: check if ciphertext deserializes into type correctly
// TODO: check for duplicate handles in the input
// TODO: check if ciphertext doesn't exist already
// TODO: if ciphertexts exists check that it is equal to the one being uploaded

let mut trx = self
.pool
.begin()
.await
.map_err(Into::<CoprocessorError>::into)?;
for i_ct in &req.input_ciphertexts {
let ciphertext_type: i16 = i_ct
.ciphertext_type
.try_into()
.map_err(|_e| CoprocessorError::FhevmError(FhevmError::UnknownFheType(i_ct.ciphertext_type)))?;
let _ = sqlx::query!("
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
VALUES($1, $2, $3, $4, $5)
ON CONFLICT (tenant_id, handle, ciphertext_version) DO NOTHING
", tenant_id, i_ct.ciphertext_handle, i_ct.ciphertext_bytes, current_ciphertext_version(), ciphertext_type)
.execute(trx.as_mut()).await.map_err(Into::<CoprocessorError>::into)?;
}

trx.commit().await.map_err(Into::<CoprocessorError>::into)?;

return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

}
Loading