Skip to content

Commit

Permalink
added logic to process results
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Jan 31, 2024
1 parent ddbf2b1 commit f2add9b
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use bytemuck::bytes_of;
use eyre::{anyhow, Error};
use futures::future;
use memmap::{Mmap, MmapOptions};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
Expand Down Expand Up @@ -85,7 +86,10 @@ impl Coordinator {

handles.push(batch_process_shares_handle);

//TODO: process results Handle each that comes through and calc the min distance and min index
let (min_index, min_distance) =
self.process_results(batch_process_shares_rx).await?;

self.enqueue_distance_shares().await?;

for handle in handles {
handle.await??;
Expand Down Expand Up @@ -210,9 +214,59 @@ impl Coordinator {

pub async fn process_results(
&self,
processed_shares_rx: Receiver<(Vec<[u16; 31]>, Vec<Vec<[u16; 31]>>)>,
) -> eyre::Result<()> {
todo!()
mut processed_shares_rx: Receiver<(
Vec<[u16; 31]>,
Vec<Vec<[u16; 31]>>,
)>,
) -> eyre::Result<(usize, f64)> {
// Keep track of min distance entry.
let mut min_distance = f64::INFINITY;
let mut min_index = usize::MAX;
let mut i = 0;

loop {
// Fetch batches of denominators and shares
let (denom_batch, shares) =
processed_shares_rx.recv().await.unwrap();
let batch_size = denom_batch.len();
if batch_size == 0 {
break;
}

// Compute batch of distances in Rayon
let worker = tokio::task::spawn_blocking(move || {
(0..batch_size)
.into_par_iter()
.map(|i| {
let denominator = denom_batch[i];
let mut numerator = [0_u16; 31];
for share in shares.iter() {
let share = share[i];
for (n, &s) in
numerator.iter_mut().zip(share.iter())
{
*n = n.wrapping_add(s);
}
}
distance::decode_distance(&numerator, &denominator)
})
.collect::<Vec<_>>()
});
let distances = worker.await?;

// Aggregate distances
for (j, distance) in distances.into_iter().enumerate() {
if distance < min_distance {
min_index = i + j;
min_distance = distance;
}
}

// Update counter
i += batch_size;
}

Ok((min_index, min_distance))
}

pub fn initialize_mmap_db(&self) -> Mmap {
Expand Down

0 comments on commit f2add9b

Please sign in to comment.