Skip to content

Commit

Permalink
temporarily updated mmap logic
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune committed Jan 31, 2024
1 parent 7049f92 commit f431fc3
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
use std::collections::BinaryHeap;
use std::io::ErrorKind;
use std::fs::{File, OpenOptions};
use std::net::SocketAddr;
use std::panic::panic_any;
use std::sync::Arc;
use std::time::Duration;

use aws_sdk_sqs::operation::receive_message::builders::ReceiveMessageFluentBuilder;
use aws_sdk_sqs::operation::send_message::builders::SendMessageFluentBuilder;
use bytemuck::bytes_of;
use eyre::{anyhow, Error};
use futures::future;
use memmap::{Mmap, MmapOptions};
use memmap::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{self, mpsc, Mutex};
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;

use crate::bits::Bits;
use crate::distance::{self, Distance, DistanceResults, MasksEngine};
use crate::encoded_bits::EncodedBits;
use crate::template::{self, Template};
use crate::template::Template;

const BATCH_SIZE: usize = 20_000; //TODO: probably make this configurable
const BATCH_SIZE: usize = 20_000; //TODO: make this configurable

pub struct Coordinator {
aws_client: aws_sdk_sqs::Client,
Expand Down Expand Up @@ -68,7 +62,7 @@ impl Coordinator {

//TODO: update error handling
pub async fn spawn(mut self) -> eyre::Result<()> {
let mmap_db: Arc<Mmap> = Arc::new(self.initialize_mmap_db());
let mmap_db: Arc<Mmap> = Arc::new(self.initialize_mmap_db()?);

loop {
if let Some(messages) = self.dequeue_queries().await? {
Expand Down Expand Up @@ -105,7 +99,8 @@ impl Coordinator {
}
}

//TODO: sleep for some amount of time
//TODO: decide how long to sleep
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

Expand Down Expand Up @@ -316,8 +311,17 @@ impl Coordinator {
Ok(distance_results)
}

pub fn initialize_mmap_db(&self) -> Mmap {
todo!()
pub fn initialize_mmap_db(&self) -> eyre::Result<Mmap> {
//TODO: update this
// Try to open the file, or create it if it does not exist
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open("masks")?;

// Memory-map the file
unsafe { Ok(Mmap::map(&file)?) }
}

pub async fn dequeue_queries(
Expand Down

0 comments on commit f431fc3

Please sign in to comment.