Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint to get ciphertexts from coprocessor in batch #31

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fhevm-engine/coprocessor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub struct Args {
#[arg(long, default_value_t = 5000)]
pub server_maximum_ciphertexts_to_schedule: usize,

/// Server maximum ciphertexts to serve on get_cihpertexts endpoint
#[arg(long, default_value_t = 5000)]
pub server_maximum_ciphertexts_to_get: usize,

/// Work items batch size
#[arg(long, default_value_t = 10)]
pub work_items_batch_size: i32,
Expand Down
63 changes: 62 additions & 1 deletion fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use alloy::signers::SignerSync;
use alloy::sol_types::SolStruct;
use coprocessor::async_computation_input::Input;
use coprocessor::{
InputCiphertextResponse, InputCiphertextResponseHandle, InputUploadBatch, InputUploadResponse,
FetchedCiphertext, GetCiphertextSingleResponse, InputCiphertextResponse, InputCiphertextResponseHandle, InputUploadBatch, InputUploadResponse
};
use fhevm_engine_common::tfhe_ops::{
check_fhe_operand_types, current_ciphertext_version, trivial_encrypt_be_bytes,
Expand Down Expand Up @@ -563,4 +563,65 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ

return Ok(tonic::Response::new(GenericResponse { response_code: 0 }));
}

async fn get_ciphertexts(
&self,
request: tonic::Request<coprocessor::GetCiphertextBatch>,
) -> std::result::Result<tonic::Response<coprocessor::GetCiphertextResponse>, tonic::Status> {
let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?;
let req = request.get_ref();

if req.handles.len() > self.args.server_maximum_ciphertexts_to_get {
return Err(tonic::Status::from_error(Box::new(
CoprocessorError::MoreThanMaximumCiphertextsAttemptedToDownload {
input_count: req.handles.len(),
maximum_allowed: self.args.server_maximum_ciphertexts_to_get,
},
)));
}

let mut result = coprocessor::GetCiphertextResponse { responses: Vec::new() };
let mut set = BTreeSet::new();

for h in &req.handles {
let _ = set.insert(h.clone());
}

let cts: Vec<Vec<u8>> = set.into_iter().collect();

let db_cts = query!(
"
SELECT handle, ciphertext_type, ciphertext_version, ciphertext
FROM ciphertexts
WHERE tenant_id = $1
AND handle = ANY($2::BYTEA[])
",
tenant_id, &cts
)
.fetch_all(&self.pool)
.await
.map_err(Into::<CoprocessorError>::into)?;

let mut the_map: BTreeMap<Vec<u8>, _> = BTreeMap::new();
for ct in db_cts {
let _ = the_map.insert(ct.handle.clone(), ct);
}

for h in &req.handles {
result.responses.push(
GetCiphertextSingleResponse {
handle: h.clone(),
ciphertext: the_map.get(h).map(|res| {
FetchedCiphertext {
ciphertext_bytes: res.ciphertext.clone(),
ciphertext_type: res.ciphertext_type as i32,
ciphertext_version: res.ciphertext_version as i32
}
})
}
);
}

return Ok(tonic::Response::new(result));
}
}
37 changes: 35 additions & 2 deletions fhevm-engine/coprocessor/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::server::common::FheOperation;
use crate::server::coprocessor::async_computation_input::Input;
use crate::server::coprocessor::fhevm_coprocessor_client::FhevmCoprocessorClient;
use crate::server::coprocessor::{
AsyncComputation, AsyncComputationInput, AsyncComputeRequest, TrivialEncryptBatch,
TrivialEncryptRequestSingle,
AsyncComputation, AsyncComputationInput, AsyncComputeRequest, GetCiphertextBatch, TrivialEncryptBatch, TrivialEncryptRequestSingle
};
use tonic::metadata::MetadataValue;
use utils::{default_api_key, decrypt_ciphertexts, random_handle, wait_until_all_ciphertexts_computed};
Expand Down Expand Up @@ -33,6 +32,8 @@ async fn test_smoke() -> Result<(), Box<dyn std::error::Error>> {
let h2 = random_handle().to_be_bytes();
let h3 = random_handle().to_be_bytes();
let h4 = random_handle().to_be_bytes();
// unused, non existing
let h5 = random_handle().to_be_bytes();

// encrypt two ciphertexts
{
Expand Down Expand Up @@ -113,6 +114,38 @@ async fn test_smoke() -> Result<(), Box<dyn std::error::Error>> {
assert_eq!(resp[1].output_type, ct_type as i16);
}

// compute
{
let mut get_cts_req = tonic::Request::new(GetCiphertextBatch {
handles: vec![
h1.to_vec(),
h2.to_vec(),
h3.to_vec(),
h4.to_vec(),
h5.to_vec(),
],
});
get_cts_req.metadata_mut().append(
"authorization",
MetadataValue::from_str(&api_key_header).unwrap(),
);
let resp = client.get_ciphertexts(get_cts_req).await?;
let output = resp.get_ref();
assert_eq!(output.responses.len(), 5);

assert_eq!(output.responses[0].handle, h1);
assert_eq!(output.responses[1].handle, h2);
assert_eq!(output.responses[2].handle, h3);
assert_eq!(output.responses[3].handle, h4);
assert_eq!(output.responses[4].handle, h5);

assert!(output.responses[0].ciphertext.is_some());
assert!(output.responses[1].ciphertext.is_some());
assert!(output.responses[2].ciphertext.is_some());
assert!(output.responses[3].ciphertext.is_some());
assert!(output.responses[4].ciphertext.is_none());
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
run_server: true,
generate_fhe_keys: false,
server_maximum_ciphertexts_to_schedule: 5000,
server_maximum_ciphertexts_to_get: 5000,
work_items_batch_size: 40,
tenant_key_cache_size: 4,
coprocessor_fhe_threads: 4,
Expand Down
10 changes: 10 additions & 0 deletions fhevm-engine/coprocessor/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum CoprocessorError {
input_count: usize,
maximum_allowed: usize,
},
MoreThanMaximumCiphertextsAttemptedToDownload {
input_count: usize,
maximum_allowed: usize,
},
CompactInputCiphertextHasMoreCiphertextThanLimitAllows {
input_blob_index: usize,
input_ciphertexts_in_blob: usize,
Expand Down Expand Up @@ -72,6 +76,12 @@ impl std::fmt::Display for CoprocessorError {
} => {
write!(f, "More than maximum input blobs uploaded, maximum allowed: {maximum_allowed}, uploaded: {input_count}")
}
Self::MoreThanMaximumCiphertextsAttemptedToDownload {
input_count,
maximum_allowed,
} => {
write!(f, "Requested more than maximum ciphertexts allowed to download, maximum allowed: {maximum_allowed}, requested: {input_count}")
}
Self::CompactInputCiphertextHasMoreCiphertextThanLimitAllows {
input_blob_index,
input_ciphertexts_in_blob,
Expand Down
20 changes: 20 additions & 0 deletions proto/coprocessor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ service FhevmCoprocessor {
rpc AsyncCompute (AsyncComputeRequest) returns (GenericResponse) {}
rpc WaitComputations (AsyncComputeRequest) returns (FhevmResponses) {}
rpc UploadInputs (InputUploadBatch) returns (InputUploadResponse) {}
rpc GetCiphertexts (GetCiphertextBatch) returns (GetCiphertextResponse) {}
rpc TrivialEncryptCiphertexts (TrivialEncryptBatch) returns (GenericResponse) {}
}

message GetCiphertextBatch {
repeated bytes handles = 1;
}

message GetCiphertextResponse {
repeated GetCiphertextSingleResponse responses = 1;
}

message GetCiphertextSingleResponse {
bytes handle = 1;
optional FetchedCiphertext ciphertext = 2;
}

message FetchedCiphertext {
bytes ciphertext_bytes = 1;
int32 ciphertext_version = 2;
int32 ciphertext_type = 3;
}

message TrivialEncryptBatch {
repeated TrivialEncryptRequestSingle values = 1;
}
Expand Down
Loading