Skip to content

Commit

Permalink
Provide backpressure on crypto binary
Browse files Browse the repository at this point in the history
We've been reading from a file faster than we were encrypting. That
caused sender buffer to grow infinitely.
  • Loading branch information
akoshelev committed Dec 9, 2024
1 parent e7a234a commit ff65340
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions ipa-core/src/cli/crypto/hybrid_encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
fs::{read_to_string, File, OpenOptions},
io::{BufWriter, Write},
path::{Path, PathBuf},
sync::mpsc::{channel, Sender},
sync::mpsc::SyncSender,
thread,
thread::JoinHandle,
time::Instant,
Expand Down Expand Up @@ -109,20 +109,20 @@ impl HybridEncryptArgs {
/// A thread-per-core pool responsible for encrypting reports in parallel.
/// This pool is shared across all writers to reduce the number of context switches.
struct EncryptorPool {
pool: Vec<(Sender<EncryptorInput>, JoinHandle<UnitResult>)>,
pool: Vec<(SyncSender<EncryptorInput>, JoinHandle<UnitResult>)>,
next_worker: usize,
}

impl EncryptorPool {
pub fn with_worker_threads(
thread_count: usize,
file_writer: [Sender<EncryptorOutput>; 3],
file_writer: [SyncSender<EncryptorOutput>; 3],
key_registries: [KeyRegistry<PublicKeyOnly>; 3],
) -> Self {
Self {
pool: (0..thread_count)
.map(move |i| {
let (tx, rx) = channel::<EncryptorInput>();
let (tx, rx) = std::sync::mpsc::sync_channel::<EncryptorInput>(65535);
let key_registries = key_registries.clone();
let file_writer = file_writer.clone();
(
Expand Down Expand Up @@ -234,13 +234,13 @@ impl ReportWriter {
/// just the index of file input row that guarantees consistency
/// of shares written across 3 files
struct FileWriteWorker {
sender: Sender<FileWorkerInput>,
sender: SyncSender<FileWorkerInput>,
handle: JoinHandle<UnitResult>,
}

impl FileWriteWorker {
pub fn new(file: File) -> Self {
let (tx, rx) = std::sync::mpsc::channel();
let (tx, rx) = std::sync::mpsc::sync_channel(65535);
Self {
sender: tx,
handle: thread::spawn(move || {
Expand Down

0 comments on commit ff65340

Please sign in to comment.