From 8997fe7a65470f1285699416c0d8a9dab4e88b1a Mon Sep 17 00:00:00 2001 From: timoast <4591688+timoast@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:56:33 +0800 Subject: [PATCH] Revert single thread --- src/f2m.rs | 48 ++++++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/src/f2m.rs b/src/f2m.rs index 2ca178a..c79a2d3 100644 --- a/src/f2m.rs +++ b/src/f2m.rs @@ -1,8 +1,6 @@ use std::{ io, fs, - thread, - sync::mpsc, path::Path, error::Error, fs::File, @@ -120,33 +118,33 @@ fn fcount( // vector of features // each element is hashmap of cell: count let mut peak_cell_counts: Vec> = vec![FxHashMap::::default(); total_peaks]; - - // Create a channel for communication between the decompression and processing threads - let (tx, rx) = mpsc::sync_channel(500); - - // Spawn the decompression thread - let frag_file = frag_file.to_path_buf(); - - let decompress_handle = thread::spawn(move || { - let reader = BufReader::new(MultiGzDecoder::new(File::open(frag_file) - .expect("Failed to open fragment file"))); - for line in reader.lines() { - let line = line.expect("Failed to read line"); - if tx.send(line).is_err() { - break; - } - } - }); - // Processing logic on the main thread + // frag file reading + let frag_file = File::open(frag_file)?; + let mut reader = BufReader::with_capacity(1024 * 1024, MultiGzDecoder::new(frag_file)); + let mut line_count: u64 = 0; let update_interval = 1_000_000; let mut check_end: bool; + let mut line_str = String::new(); + let mut startpos: u32; + let mut endpos: u32; - for line in rx { + loop { + + match reader.read_line(&mut line_str) { + Ok(0) => break, + Ok(_) => {}, + Err(e) => { + error!("Error reading fragment file: {}", e); + return Err(e); + } + } + let line = &line_str[..line_str.len() - 1]; // Skip header lines that start with # if line.starts_with('#') { + line_str.clear(); continue; } @@ -170,7 +168,7 @@ fn fcount( if peaks.contains_key(seqname) { // try to parse the coordinates, skip the line if parsing fails - let startpos: u32 = match fields[1].trim().parse() { + startpos = match fields[1].trim().parse() { Ok(num) => num, Err(e) => { eprintln!("Failed to parse start position: {:?}. Error: {}", line, e); @@ -178,7 +176,7 @@ fn fcount( } }; - let endpos: u32 = match fields[2].trim().parse() { + endpos = match fields[2].trim().parse() { Ok(num) => num, Err(e) => { eprintln!("Failed to parse end position: {:?}. Error: {}", line, e); @@ -207,12 +205,10 @@ fn fcount( } } } + line_str.clear(); } eprintln!(); - // Wait for the decompression thread to finish - decompress_handle.join().expect("Decompression thread panicked"); - // write count matrix let counts_path = output.join("matrix.mtx.gz"); info!("Writing output counts file: {:?}", &counts_path);