From b54c068ea082f31a99f497799b4ab2988509ee2c Mon Sep 17 00:00:00 2001 From: Lan Le Date: Mon, 2 Sep 2024 11:52:20 +0200 Subject: [PATCH] feat WIP: process tic positive and negative --- chem_spectra/lib/composer/lcms.py | 3 +- chem_spectra/lib/composer/ms_fix.py | 147 ++++++++++++++++++ chem_spectra/lib/converter/lcms/base.py | 15 +- tests/lib/composer/test_lcms_composer.py | 8 - .../lib/converter/lcms/test_lcms_converter.py | 132 +++------------- 5 files changed, 181 insertions(+), 124 deletions(-) create mode 100644 chem_spectra/lib/composer/ms_fix.py diff --git a/chem_spectra/lib/composer/lcms.py b/chem_spectra/lib/composer/lcms.py index 9d401c0..ff4ff24 100644 --- a/chem_spectra/lib/composer/lcms.py +++ b/chem_spectra/lib/composer/lcms.py @@ -56,7 +56,8 @@ def __gen_config(self): def __gen_ms_spectra(self): msspcs = [] ms_tempfile = tempfile.TemporaryFile() - for time, value in self.core.data.items(): + spectra_data = self.core.data[2] # the 1st and 2nd is tic positive and negative + for time, value in spectra_data.items(): xs, ys = value['mz'], value['intensities'] msspc = [ '##PAGE={}\n'.format(time), diff --git a/chem_spectra/lib/composer/ms_fix.py b/chem_spectra/lib/composer/ms_fix.py new file mode 100644 index 0000000..c10ad64 --- /dev/null +++ b/chem_spectra/lib/composer/ms_fix.py @@ -0,0 +1,147 @@ +import matplotlib +matplotlib.use('Agg') + +import tempfile # noqa: E402 +import matplotlib.pyplot as plt # noqa: E402 + +from chem_spectra.lib.composer.base import BaseComposer # noqa: E402 + + +TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n' +TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa + + +class MSComposer(BaseComposer): + def __init__(self, core): + super().__init__(core) + self.title = core.fname + self.meta = self.__compose() + + def __gen_headers_spectrum_orig(self): + return [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('MASS SPECTRUM'), + '##DATA CLASS= NTUPLES\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + '##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501 + '##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC + '##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501 + '##$CSCATEGORY=SPECTRUM\n', + '##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan), + '##$CSSCANEDITTARGET={}\n'.format( + self.core.edit_scan or self.core.auto_scan + ), + '##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)), + '##$CSTHRESHOLD={}\n'.format(self.core.thres / 100), + ] + + def __gen_ntuples_begin(self): + return ['##NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_ntuples_end(self): + return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_config(self): + return [ + '##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n', + '##SYMBOL= X, Y, T\n', + '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + '##VAR_FORM= AFFN, AFFN, AFFN\n', + '##VAR_DIM= , , 3\n', + '##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n', + '##FIRST= , , 1\n', + '##LAST= , , {}\n'.format(len(self.core.datatables)), + ] + + def __gen_ms_spectra(self): + msspcs = [] + # with open('ms_compose.txt', 'a') as tmpfile: + # for idx, dt in enumerate(self.core.datatables): + # msspc = [ + # '##PAGE={}\n'.format(idx + 1), + # '##NPOINTS={}\n'.format(dt['pts']), + # TEXT_MS_DATA_TABLE, + # ] + # my_content = msspc + dt['dt'] + # tmpfile.write(''.join(my_content)) + + for idx, dt in enumerate(self.core.datatables): + msspc = [ + '##PAGE={}\n'.format(idx + 1), + '##NPOINTS={}\n'.format(dt['pts']), + TEXT_MS_DATA_TABLE, + ] + msspcs = msspcs + msspc + dt['dt'] + # with open('ms_compose.txt', 'r') as tmpfile: + # # msspcs = tmpfile.read() + # lines = tmpfile.readlines() + # msspcs = ''.join(lines) + return msspcs + + def __compose(self): + meta = [] + meta.extend(self.__gen_headers_spectrum_orig()) + + meta.extend(self.__gen_ntuples_begin()) + meta.extend(self.__gen_config()) + meta.extend(self.__gen_ms_spectra()) + meta.extend(self.__gen_ntuples_end()) + + # meta.extend(self.generate_original_metadata()) + + meta.extend(self.gen_ending()) + return meta + + def __prism(self, spc): + blues_x, blues_y, greys_x, greys_y = [], [], [], [] + thres = 0 + if spc.shape[0] > 0: # RESOLVE_VSMBNAN2 + thres = spc[:, 1].max() * (self.core.thres / 100) + + for pt in spc: + x, y = pt[0], pt[1] + if y >= thres: + blues_x.append(x) + blues_y.append(y) + else: + greys_x.append(x) + greys_y.append(y) + return blues_x, blues_y, greys_x, greys_y + + def prism_peaks(self): + idx = (self.core.edit_scan or self.core.auto_scan) - 1 + spc = self.core.spectra[idx] + return self.__prism(spc) + tuple([idx+1]) + + def tf_img(self): + plt.rcParams['figure.figsize'] = [16, 9] + plt.rcParams['font.size'] = 14 + # PLOT data + blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks() + plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd') + plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4') + + # PLOT label + plt.xlabel('X (m/z)', fontsize=18) + plt.ylabel('Y (Relative Abundance)', fontsize=18) + plt.grid(False) + + # Save + tf = tempfile.NamedTemporaryFile(suffix='.png') + plt.savefig(tf, format='png') + tf.seek(0) + plt.clf() + plt.cla() + return tf + + def tf_csv(self): + return None + + def generate_nmrium(self): + return None + \ No newline at end of file diff --git a/chem_spectra/lib/converter/lcms/base.py b/chem_spectra/lib/converter/lcms/base.py index c860014..e622551 100644 --- a/chem_spectra/lib/converter/lcms/base.py +++ b/chem_spectra/lib/converter/lcms/base.py @@ -13,10 +13,21 @@ def __init__(self, target_dir, params=False, fname=''): self.data = self.__read(target_dir, fname) def __read(self, target_dir, fname): + tic_positive_file_path = os.path.join(target_dir, 'TIC_PLUS.csv') + tic_postive_data = self.__read_tic(tic_positive_file_path) + + tic_negative_file_path = os.path.join(target_dir, 'TIC_MINUS.csv') + tic_negative_data = self.__read_tic(tic_negative_file_path, True) + spectra_file_path = os.path.join(target_dir, 'MZ_Spectra.csv') data_frame = pd.read_csv(spectra_file_path, index_col='time', header=0) grouped_df = data_frame.groupby('time').agg(list) - grouped_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()} - return grouped_dict + data_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()} + return [tic_postive_data, tic_negative_data, data_dict] + + def __read_tic(self, file_path, is_negative = False): + data_frame = pd.read_csv(file_path, header=0) + tic_postive_data = data_frame.to_dict(orient='list') + return tic_postive_data diff --git a/tests/lib/composer/test_lcms_composer.py b/tests/lib/composer/test_lcms_composer.py index 6dc690a..876ea6b 100644 --- a/tests/lib/composer/test_lcms_composer.py +++ b/tests/lib/composer/test_lcms_composer.py @@ -27,11 +27,3 @@ def test_init_lcms_composer_success(zip_file): assert lcms_composer is not None assert lcms_composer.core == lcms_converter - -# def test_ms_composer_original_metadata(jcamp_file): -# base_converter = JcampBaseConverter(jcamp_file) -# ms_converter = JcampMSConverter(base=base_converter) -# ms_composer = MSComposer(core=ms_converter) - -# assert ms_composer is not None -# assert '$$ === CHEMSPECTRA ORIGINAL METADATA ===\n' in ms_composer.meta diff --git a/tests/lib/converter/lcms/test_lcms_converter.py b/tests/lib/converter/lcms/test_lcms_converter.py index 52dfce5..68ba060 100644 --- a/tests/lib/converter/lcms/test_lcms_converter.py +++ b/tests/lib/converter/lcms/test_lcms_converter.py @@ -1,8 +1,8 @@ -from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter import tempfile import zipfile import mimetypes import base64 +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter target_dir = './tests/fixtures/source/lcms/lcms.zip' @@ -23,125 +23,31 @@ def test_lcms_converter_failed(): converter = LCMSConveter(None) assert converter.data is None -def test_bagit_convert_to_jcamp(): +def test_lcms_converter_success(): with tempfile.TemporaryDirectory() as td: with zipfile.ZipFile(target_dir, 'r') as z: z.extractall(td) converter = LCMSConveter(td, fname='lcms') assert converter.data is not None - # assert len(converter.data) == 1 - -# def test_bagit_convert_to_jcamp_cv_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(cv_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=CYCLIC VOLTAMMETRY') - -# def test_bagit_convert_to_jcamp_aif_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(aif_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=SORPTION-DESORPTION MEASUREMENT') - -# def test_bagit_convert_to_jcamp_emissions_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(emissions_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=Emissions') - -# def test_bagit_convert_to_jcamp_dls_acf_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=DLS ACF') - -# def test_bagit_convert_to_jcamp_dls_intensity_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(dls_intensity_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=DLS intensity') - -# def test_bagit_convert_to_images(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(cv_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# assert converter.images is not None -# assert len(converter.images) == 3 -# pngImage = converter.images[0] -# assertFileType(pngImage, 'image/png') - -# def test_bagit_convert_to_csv(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(cv_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# assert converter.list_csv is not None -# assert len(converter.list_csv) == 3 -# csvFile = converter.list_csv[0] -# assertFileType(csvFile, 'text/csv') + assert len(converter.data) == 3 -# def test_get_base64_data_failed(): -# converter = BagItConveter(None) -# data = converter.get_base64_data() -# assert data is None - -# def test_get_base64_data_succeed(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(cv_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# list_base64 = converter.get_base64_data() -# assert len(list_base64) == 3 -# for base64Str in list_base64: -# isBase64(base64Str) - -# def test_get_combined_image_failed(): -# converter = BagItConveter(None) -# combined_image = converter.combined_image -# assert combined_image is None - -# def test_get_combined_image(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(cv_layout_path, 'r') as z: -# z.extractall(td) - -# converter = BagItConveter(td) -# combined_image = converter.combined_image -# assertFileType(combined_image, 'image/png') - -# def test_bagit_has_one_file_no_combined_image(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: -# z.extractall(td) +def test_lcms_converter_tic_positive(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) -# converter = BagItConveter(td) -# assert converter.combined_image is None + converter = LCMSConveter(td, fname='lcms') + tic_positive = converter.data[0] + assert len(tic_positive['time']) > 0 + assert len(tic_positive['Intensity']) > 0 -# def test_bagit_convert_to_jcamp_dsc_layout(): -# with tempfile.TemporaryDirectory() as td: -# with zipfile.ZipFile(dsc_layout_path, 'r') as z: -# z.extractall(td) +def test_lcms_converter_tic_negative(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) -# converter = BagItConveter(td) -# jcamp = converter.data[0] -# assertJcampContent(jcamp, '##DATA TYPE=DIFFERENTIAL SCANNING CALORIMETRY') + converter = LCMSConveter(td, fname='lcms') + tic_positive = converter.data[1] + assert len(tic_positive['time']) > 0 + assert len(tic_positive['Intensity']) > 0