diff --git a/chem_spectra/lib/composer/lcms.py b/chem_spectra/lib/composer/lcms.py new file mode 100644 index 0000000..9d401c0 --- /dev/null +++ b/chem_spectra/lib/composer/lcms.py @@ -0,0 +1,140 @@ +import tempfile # noqa: E402 + +from chem_spectra.lib.composer.base import BaseComposer # noqa: E402 + + +TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n' +TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa + +class LCMSComposer(BaseComposer): + def __init__(self, core): + super().__init__(core) + self.title = core.fname + self.meta = self.__compose() + + def __gen_headers_spectrum_orig(self): + return [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('LC/MS'), + '##DATA CLASS= NTUPLES\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + # '##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501 + # '##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC + # '##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501 + '##$CSCATEGORY=SPECTRUM\n', + # '##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan), + # '##$CSSCANEDITTARGET={}\n'.format( + # self.core.edit_scan or self.core.auto_scan + # ), + # '##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)), + # '##$CSTHRESHOLD={}\n'.format(self.core.thres / 100), + ] + + def __gen_ntuples_begin(self): + return ['##NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_ntuples_end(self): + return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_config(self): + return [ + '##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n', + '##SYMBOL= X, Y, T\n', + '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + '##VAR_FORM= AFFN, AFFN, AFFN\n', + '##VAR_DIM= , , 3\n', + '##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n', + '##FIRST= , , 1\n', + # '##LAST= , , {}\n'.format(len(self.core.datatables)), + ] + + def __gen_ms_spectra(self): + msspcs = [] + ms_tempfile = tempfile.TemporaryFile() + for time, value in self.core.data.items(): + xs, ys = value['mz'], value['intensities'] + msspc = [ + '##PAGE={}\n'.format(time), + '##NPOINTS={}\n'.format(len(value['mz'])), + '##DATA TABLE= (XY..XY), PEAKS\n', + ] + for idx in range(len(xs)): + my_content = '{}, {};\n'.format(xs[idx], ys[idx]) + msspc += my_content + file_content = ''.join(msspc) + ms_tempfile.write(file_content.encode('utf-8')) + + ms_tempfile.seek(0) + lines = ms_tempfile.readlines() + decoded_lines = [line.decode('utf-8').strip() for line in lines] + msspcs = '\n'.join(decoded_lines) + ms_tempfile.close() + return msspcs + + def __compose(self): + meta = [] + meta.extend(self.__gen_headers_spectrum_orig()) + + meta.extend(self.__gen_ntuples_begin()) + meta.extend(self.__gen_config()) + meta.extend(self.__gen_ms_spectra()) + meta.extend(self.__gen_ntuples_end()) + + # meta.extend(self.generate_original_metadata()) + + meta.extend(self.gen_ending()) + return meta + + # def __prism(self, spc): + # blues_x, blues_y, greys_x, greys_y = [], [], [], [] + # thres = 0 + # if spc.shape[0] > 0: # RESOLVE_VSMBNAN2 + # thres = spc[:, 1].max() * (self.core.thres / 100) + + # for pt in spc: + # x, y = pt[0], pt[1] + # if y >= thres: + # blues_x.append(x) + # blues_y.append(y) + # else: + # greys_x.append(x) + # greys_y.append(y) + # return blues_x, blues_y, greys_x, greys_y + + # def prism_peaks(self): + # idx = (self.core.edit_scan or self.core.auto_scan) - 1 + # spc = self.core.spectra[idx] + # return self.__prism(spc) + tuple([idx+1]) + + def tf_img(self): + # plt.rcParams['figure.figsize'] = [16, 9] + # plt.rcParams['font.size'] = 14 + # # PLOT data + # blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks() + # plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd') + # plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4') + + # # PLOT label + # plt.xlabel('X (m/z)', fontsize=18) + # plt.ylabel('Y (Relative Abundance)', fontsize=18) + # plt.grid(False) + + # # Save + # tf = tempfile.NamedTemporaryFile(suffix='.png') + # plt.savefig(tf, format='png') + # tf.seek(0) + # plt.clf() + # plt.cla() + # return tf + return None + + def tf_csv(self): + return None + + def generate_nmrium(self): + return None \ No newline at end of file diff --git a/chem_spectra/lib/converter/lcms/__init__.py b/chem_spectra/lib/converter/lcms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chem_spectra/lib/converter/lcms/base.py b/chem_spectra/lib/converter/lcms/base.py new file mode 100644 index 0000000..c860014 --- /dev/null +++ b/chem_spectra/lib/converter/lcms/base.py @@ -0,0 +1,22 @@ +import os +import pandas as pd +from chem_spectra.lib.converter.share import parse_params + +class LCMSBaseConverter: + def __init__(self, target_dir, params=False, fname=''): + self.params = parse_params(params) + self.typ = None + self.fname = fname + if target_dir is None: + self.data = None + else: + self.data = self.__read(target_dir, fname) + + def __read(self, target_dir, fname): + spectra_file_path = os.path.join(target_dir, 'MZ_Spectra.csv') + data_frame = pd.read_csv(spectra_file_path, index_col='time', header=0) + grouped_df = data_frame.groupby('time').agg(list) + grouped_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()} + return grouped_dict + + diff --git a/chem_spectra/model/transformer.py b/chem_spectra/model/transformer.py index fd8b363..4d5fe8f 100644 --- a/chem_spectra/model/transformer.py +++ b/chem_spectra/model/transformer.py @@ -13,9 +13,11 @@ from chem_spectra.lib.converter.fid.base import FidBaseConverter from chem_spectra.lib.converter.fid.bruker import FidHasBruckerProcessed from chem_spectra.lib.converter.bagit.base import BagItBaseConverter +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter from chem_spectra.lib.converter.ms import MSConverter from chem_spectra.lib.composer.ni import NIComposer from chem_spectra.lib.composer.ms import MSComposer +from chem_spectra.lib.composer.lcms import LCMSComposer from chem_spectra.lib.composer.base import BaseComposer # noqa: F401 from chem_spectra.lib.converter.nmrium.base import NMRiumDataConverter import matplotlib.pyplot as plt # noqa: E402 @@ -63,6 +65,13 @@ def search_bag_it_file(td): except: # noqa: E722 return False +def search_lcms_file(td): + try: + target_dir = find_dir(td, 'MZ_Spectra.csv') + return target_dir + except: # noqa: E722 + return False + class TransformerModel: def __init__(self, file, molfile=None, params=False, multiple_files=False): @@ -180,9 +189,14 @@ def zip2cvp(self): return nicv, nicp, invalid_molfile else: is_bagit = search_bag_it_file(td) + is_lcms = search_lcms_file(td) if is_bagit: bagcv = BagItBaseConverter(td, self.params, self.file.name) return bagcv, bagcv, False + elif is_lcms: + lcms_cv = LCMSBaseConverter(td, self.params, self.file.name) + lcms_np = LCMSComposer(lcms_cv) + return lcms_cv, lcms_np, False return False, False, False diff --git a/tests/fixtures/source/lcms/lcms.zip b/tests/fixtures/source/lcms/lcms.zip new file mode 100644 index 0000000..8379c21 Binary files /dev/null and b/tests/fixtures/source/lcms/lcms.zip differ diff --git a/tests/lib/composer/test_lcms_composer.py b/tests/lib/composer/test_lcms_composer.py new file mode 100644 index 0000000..6dc690a --- /dev/null +++ b/tests/lib/composer/test_lcms_composer.py @@ -0,0 +1,37 @@ +import json +import pytest +import tempfile +import zipfile +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter +from chem_spectra.lib.composer.lcms import LCMSComposer + +source = './tests/fixtures/source/lcms/lcms.zip' + +@pytest.fixture +def zip_file(): + return source + +def test_init_lcms_composer_failed(): + with pytest.raises(Exception) as error: + _ = LCMSComposer(None) + + assert error is not None + +def test_init_lcms_composer_success(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + assert lcms_composer is not None + assert lcms_composer.core == lcms_converter + +# def test_ms_composer_original_metadata(jcamp_file): +# base_converter = JcampBaseConverter(jcamp_file) +# ms_converter = JcampMSConverter(base=base_converter) +# ms_composer = MSComposer(core=ms_converter) + +# assert ms_composer is not None +# assert '$$ === CHEMSPECTRA ORIGINAL METADATA ===\n' in ms_composer.meta diff --git a/tests/lib/converter/lcms/test_lcms_converter.py b/tests/lib/converter/lcms/test_lcms_converter.py new file mode 100644 index 0000000..52dfce5 --- /dev/null +++ b/tests/lib/converter/lcms/test_lcms_converter.py @@ -0,0 +1,147 @@ +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter +import tempfile +import zipfile +import mimetypes +import base64 + +target_dir = './tests/fixtures/source/lcms/lcms.zip' + +def assertFileType(file, mimeStr): + assert mimetypes.guess_type(file.name)[0] == mimeStr + +def assertJcampContent(jcamp, field): + assertFileType(jcamp, 'chemical/x-jcamp-dx') + jcamp_content = str(jcamp.read()) + assert field in jcamp_content + +def isBase64(encodeString): + plainStr = base64.b64decode(encodeString) + encodedStr = base64.b64encode(plainStr).decode("utf-8") + assert encodedStr == encodeString + +def test_lcms_converter_failed(): + converter = LCMSConveter(None) + assert converter.data is None + +def test_bagit_convert_to_jcamp(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + assert converter.data is not None + # assert len(converter.data) == 1 + +# def test_bagit_convert_to_jcamp_cv_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(cv_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=CYCLIC VOLTAMMETRY') + +# def test_bagit_convert_to_jcamp_aif_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(aif_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=SORPTION-DESORPTION MEASUREMENT') + +# def test_bagit_convert_to_jcamp_emissions_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(emissions_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=Emissions') + +# def test_bagit_convert_to_jcamp_dls_acf_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=DLS ACF') + +# def test_bagit_convert_to_jcamp_dls_intensity_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(dls_intensity_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=DLS intensity') + +# def test_bagit_convert_to_images(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(cv_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# assert converter.images is not None +# assert len(converter.images) == 3 +# pngImage = converter.images[0] +# assertFileType(pngImage, 'image/png') + +# def test_bagit_convert_to_csv(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(cv_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# assert converter.list_csv is not None +# assert len(converter.list_csv) == 3 +# csvFile = converter.list_csv[0] +# assertFileType(csvFile, 'text/csv') + +# def test_get_base64_data_failed(): +# converter = BagItConveter(None) +# data = converter.get_base64_data() +# assert data is None + +# def test_get_base64_data_succeed(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(cv_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# list_base64 = converter.get_base64_data() +# assert len(list_base64) == 3 +# for base64Str in list_base64: +# isBase64(base64Str) + +# def test_get_combined_image_failed(): +# converter = BagItConveter(None) +# combined_image = converter.combined_image +# assert combined_image is None + +# def test_get_combined_image(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(cv_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# combined_image = converter.combined_image +# assertFileType(combined_image, 'image/png') + +# def test_bagit_has_one_file_no_combined_image(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(dls_acf_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# assert converter.combined_image is None + +# def test_bagit_convert_to_jcamp_dsc_layout(): +# with tempfile.TemporaryDirectory() as td: +# with zipfile.ZipFile(dsc_layout_path, 'r') as z: +# z.extractall(td) + +# converter = BagItConveter(td) +# jcamp = converter.data[0] +# assertJcampContent(jcamp, '##DATA TYPE=DIFFERENTIAL SCANNING CALORIMETRY')