Skip to content

Commit

Permalink
Revert single thread
Browse files Browse the repository at this point in the history
  • Loading branch information
timoast committed Aug 27, 2024
1 parent 2f7a07e commit 8997fe7
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions src/f2m.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{
io,
fs,
thread,
sync::mpsc,
path::Path,
error::Error,
fs::File,
Expand Down Expand Up @@ -120,33 +118,33 @@ fn fcount(
// vector of features
// each element is hashmap of cell: count
let mut peak_cell_counts: Vec<FxHashMap<usize, u32>> = vec![FxHashMap::<usize, u32>::default(); total_peaks];

// Create a channel for communication between the decompression and processing threads
let (tx, rx) = mpsc::sync_channel(500);

// Spawn the decompression thread
let frag_file = frag_file.to_path_buf();

let decompress_handle = thread::spawn(move || {
let reader = BufReader::new(MultiGzDecoder::new(File::open(frag_file)
.expect("Failed to open fragment file")));
for line in reader.lines() {
let line = line.expect("Failed to read line");
if tx.send(line).is_err() {
break;
}
}
});

// Processing logic on the main thread
// frag file reading
let frag_file = File::open(frag_file)?;
let mut reader = BufReader::with_capacity(1024 * 1024, MultiGzDecoder::new(frag_file));

let mut line_count: u64 = 0;
let update_interval = 1_000_000;
let mut check_end: bool;
let mut line_str = String::new();
let mut startpos: u32;
let mut endpos: u32;

for line in rx {
loop {

match reader.read_line(&mut line_str) {
Ok(0) => break,
Ok(_) => {},
Err(e) => {
error!("Error reading fragment file: {}", e);
return Err(e);
}
}
let line = &line_str[..line_str.len() - 1];

// Skip header lines that start with #
if line.starts_with('#') {
line_str.clear();
continue;
}

Expand All @@ -170,15 +168,15 @@ fn fcount(
if peaks.contains_key(seqname) {

// try to parse the coordinates, skip the line if parsing fails
let startpos: u32 = match fields[1].trim().parse() {
startpos = match fields[1].trim().parse() {
Ok(num) => num,
Err(e) => {
eprintln!("Failed to parse start position: {:?}. Error: {}", line, e);
continue;
}
};

let endpos: u32 = match fields[2].trim().parse() {
endpos = match fields[2].trim().parse() {
Ok(num) => num,
Err(e) => {
eprintln!("Failed to parse end position: {:?}. Error: {}", line, e);
Expand Down Expand Up @@ -207,12 +205,10 @@ fn fcount(
}
}
}
line_str.clear();
}
eprintln!();

// Wait for the decompression thread to finish
decompress_handle.join().expect("Decompression thread panicked");

// write count matrix
let counts_path = output.join("matrix.mtx.gz");
info!("Writing output counts file: {:?}", &counts_path);
Expand Down

0 comments on commit 8997fe7

Please sign in to comment.