Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT multiprocessing on linux #19

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions alpharaw/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!python

import warnings
warnings.filterwarnings("ignore")

def register_readers():
from .ms_data_base import ms_reader_provider
from .legacy_msdata import mgf
Expand Down
43 changes: 22 additions & 21 deletions alpharaw/thermo.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,21 @@ def _import_batch(
isolation_mz_uppers.append(isolation_center + isolation_width / 2)
precursor_charges.append(charge)
rawfile.Close()

# copys of numpy arrays are needed to move them explicitly to cpython heap
# otherwise mono might interfere later
return {
'_peak_indices': _peak_indices,
'peak_mz': np.concatenate(mz_values),
'peak_intensity': np.concatenate(intensity_values),
'rt': np.array(rt_values),
'precursor_mz': np.array(precursor_mz_values),
'precursor_charge': np.array(precursor_charges, dtype=np.int8),
'isolation_lower_mz': np.array(isolation_mz_lowers),
'isolation_upper_mz': np.array(isolation_mz_uppers),
'ms_level': np.array(ms_order_list, dtype=np.int8),
'nce': np.array(ce_list, dtype=np.float32),
'injection_time': np.array(injection_time_list, dtype=np.float32)
'peak_mz': np.concatenate(mz_values).copy(),
'peak_intensity': np.concatenate(intensity_values).copy(),
'rt': np.array(rt_values).copy(),
'precursor_mz': np.array(precursor_mz_values).copy(),
'precursor_charge': np.array(precursor_charges, dtype=np.int8).copy(),
'isolation_lower_mz': np.array(isolation_mz_lowers).copy(),
'isolation_upper_mz': np.array(isolation_mz_uppers).copy(),
'ms_level': np.array(ms_order_list, dtype=np.int8).copy(),
'nce': np.array(ce_list, dtype=np.float32).copy(),
'injection_time': np.array(injection_time_list, dtype=np.float32).copy()
}
class ThermoRawData(MSData_Base):
"""
Expand All @@ -118,7 +121,7 @@ class ThermoRawData(MSData_Base):
def __init__(self,
centroided : bool = True,
process_count : int = 10,
mp_batch_size : int = 10000,
mp_batch_size : int = 5000,
**kwargs):
"""
Parameters
Expand Down Expand Up @@ -148,17 +151,15 @@ def _import(self,
first_spectrum_number = rawfile.FirstSpectrumNumber
last_spectrum_number = rawfile.LastSpectrumNumber

if platform.system() != 'Linux':
batches = np.arange(first_spectrum_number, last_spectrum_number+1, self.mp_batch_size)
batches = np.append(batches, last_spectrum_number+1)

# use multiprocessing to load batches
_import_batch_partial = partial(_import_batch, raw_file_path, self.centroided)
with mp.get_context("spawn").Pool(processes = self.process_count) as pool:
batches = list(tqdm(pool.imap(_import_batch_partial, zip(batches[:-1], batches[1:]))))
mode = 'spawn' if platform.system() != 'Linux' else 'forkserver'

batches = np.arange(first_spectrum_number, last_spectrum_number+1, self.mp_batch_size)
batches = np.append(batches, last_spectrum_number+1)

else:
batches = [_import_batch(raw_file_path, self.centroided, (first_spectrum_number, last_spectrum_number+1))]
# use multiprocessing to load batches
_import_batch_partial = partial(_import_batch, raw_file_path, self.centroided)
with mp.get_context(mode).Pool(processes = self.process_count) as pool:
batches = list(tqdm(pool.imap(_import_batch_partial, zip(batches[:-1], batches[1:]))))

# collect peak indices
_peak_indices = np.concatenate([batch['_peak_indices'] for batch in batches])
Expand Down